From 0ee22ca331c950c938eee78d527938d1f05aae0a Mon Sep 17 00:00:00 2001 From: zzh Date: Tue, 26 Nov 2024 23:51:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=AD=A3=E7=A1=AE=E6=80=A7=E6=B2=A1=E6=9C=89?= =?UTF-8?q?=E9=97=AE=E9=A2=98=E4=BA=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/launch.json | 7 ++-- run.sh | 18 ++++++---- src/sam/markdups/markdups.cpp | 47 +++++++++++++++----------- src/sam/markdups/md_funcs.cpp | 2 +- src/sam/markdups/md_types.h | 57 -------------------------------- src/sam/markdups/pipeline_md.cpp | 12 ++++--- src/sam/markdups/shared_args.h | 4 +-- src/sam/utils/read_ends.h | 2 ++ src/sam/utils/read_name_parser.h | 4 +++ 9 files changed, 59 insertions(+), 94 deletions(-) diff --git a/.vscode/launch.json b/.vscode/launch.json index 78e15fb..c5311a0 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -13,14 +13,13 @@ "program": "${workspaceRoot}/build/bin/picard_cpp", "args": [ "MarkDuplicates", - "--INPUT", "/home/zzh/data/wgs_na12878.bam", + "--INPUT", "/home/zzh/data/bam/100w.bam", "--OUTPUT", "./out.sam", "--METRICS_FILE", "metrics.txt", - "--num_threads", "32", + "--num_threads", "1", "--max_mem", "2G", "--verbosity", "DEBUG", - "--asyncio", "true", - "--READ_NAME_REGEX", "" + "--asyncio", "true" ], "cwd": "${workspaceFolder}", // 当前工作路径:当前文件所在的工作空间 } diff --git a/run.sh b/run.sh index cb0e7e2..847a717 100755 --- a/run.sh +++ b/run.sh @@ -1,20 +1,24 @@ -nthread=1 +#nthread=1 #nthread=2 #nthread=4 #nthread=8 #nthread=16 -#nthread=32 -#nthread=32 +nthread=32 +#nthread=64 #nthread=128 #input=/home/zzh/data/bam/zy_normal.bam +#input=/home/zzh/data/bam/normal_all.sam #input=/home/zzh/data/bam/zy_tumor.bam -input=/home/zzh/data/wgs_na12878.bam -#input=~/data/bam/100w.bam +#input=/home/zzh/data/wgs_na12878.bam +input=~/data/bam/100w.bam #input=~/data/bam/t100w.sam #input=~/data/bam/1k.sam #input=~/data/bam/1kw.sam #input=~/data/bam/1kw.bam #input=~/data/bam/n1kw.sam +#input=~/data/bam/tumor_small.bam +#input=~/data/bam/normal_small.bam +#input=~/data/bam/normal_30.bam output=/home/zzh/data1/na12878_out.bam cd ./build/ && make -j 8 && cd .. @@ -24,10 +28,10 @@ time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \ --INPUT $input \ --OUTPUT $output \ --INDEX_FORMAT BAI \ - --num_threads $nthread \ +--num_threads $nthread \ --max_mem 2G \ --verbosity DEBUG \ - --asyncio true -TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX null # + --asyncio true -TAG_DUPLICATE_SET_MEMBERS true --READ_NAME_REGEX null # #--TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX "" #--READ_NAME_REGEX ".*?([0-9]+):([0-9]+):([0-9]+)$" #--READ_NAME_REGEX "" diff --git a/src/sam/markdups/markdups.cpp b/src/sam/markdups/markdups.cpp index 40bd2e3..09c6809 100644 --- a/src/sam/markdups/markdups.cpp +++ b/src/sam/markdups/markdups.cpp @@ -54,10 +54,10 @@ sam_hdr_t *g_outBamHeader; // 输出文件的header GlobalArg &g_gArg = GlobalArg::Instance(); static MarkDupsArg g_mdArg_; MarkDupsArg &g_mdArg = g_mdArg_; -static GlobalDataArg gData_; -GlobalDataArg &gData = gData_; DuplicationMetrics gMetrics_; -DuplicationMetrics &gMetrics = gMetrics_; // 记录统计信息 +DuplicationMetrics &gMetrics = gMetrics_; // 记录统计信息 +PipelineArg gPipe_; +PipelineArg &gPipe = gPipe_; /* 程序版本等信息 */ const char *PROGRAM_GROUP_VERSION = "3.2.0"; @@ -127,7 +127,7 @@ int MarkDuplicates(int argc, char *argv[]) { Timer time_all; /* 读取命令行参数 */ - g_mdArg.parseArgument(argc, argv, &g_gArg); // 解析命令行参数 + g_mdArg.parseArgument(argc, argv, &g_gArg); // 解析命令行参数 if (g_gArg.num_threads < 1) g_gArg.num_threads = 1; // 线程数不能小于1 @@ -152,7 +152,7 @@ int MarkDuplicates(int argc, char *argv[]) { htsThreadPool htsPoolWrite = {NULL, 0}; // 读写用不同的线程池 // htsPoolRead.pool = hts_tpool_init(g_gArg.num_threads); // htsPoolWrite.pool = hts_tpool_init(g_gArg.num_threads); - htsPoolRead.pool = hts_tpool_init(16); + htsPoolRead.pool = hts_tpool_init(32); htsPoolWrite.pool = hts_tpool_init(32); if (!htsPoolRead.pool || !htsPoolWrite.pool) { Error("[%d] failed to set up thread pool", __LINE__); @@ -219,9 +219,9 @@ int MarkDuplicates(int argc, char *argv[]) { inBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); DupIdxQueue dupIdxQue, repIdxQue; DupIdxQueue opticalIdxQue; - dupIdxQue.Init(&gData.dupIdxArr); - repIdxQue.Init(&gData.repIdxArr); - opticalIdxQue.Init(&gData.opticalDupIdxArr); + dupIdxQue.Init(&gPipe.intersectData.dupIdxArr); + repIdxQue.Init(&gPipe.intersectData.repIdxArr); + opticalIdxQue.Init(&gPipe.intersectData.opticalDupIdxArr); Timer tw; cout << "dupsize: " << dupIdxQue.Size() << endl; uint64_t bamIdx = 0; @@ -237,7 +237,7 @@ int MarkDuplicates(int argc, char *argv[]) { uint32_t duplicateSetSize = 0; int64_t realDupSize = 0; - exit(0); + // exit(0); while (inBuf.ReadStat() >= 0) { Timer tw1; size_t readNum = inBuf.ReadBam(); @@ -292,6 +292,9 @@ int MarkDuplicates(int argc, char *argv[]) { } // 添加冗余标识 bw->SetDuplicateReadFlag(true); + if (isOpticalDup) { + // cerr << bamIdx << endl; + } uint8_t *oldTagVal = bam_aux_get(bw->b, g_mdArg.DUPLICATE_TYPE_TAG.c_str()); if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal); @@ -331,15 +334,19 @@ int MarkDuplicates(int argc, char *argv[]) { } if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS && isInDuplicateSet) { - // cerr << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl; - uint8_t *oldTagVal = bam_aux_get(bw->b, g_mdArg.DUPLICATE_SET_INDEX_TAG.c_str()); - if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal); - bam_aux_append(bw->b, g_mdArg.DUPLICATE_SET_INDEX_TAG.c_str(), 'i', - sizeof(representativeReadIndexInFile), (uint8_t *)&representativeReadIndexInFile); - oldTagVal = bam_aux_get(bw->b, g_mdArg.DUPLICATE_SET_SIZE_TAG.c_str()); - if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal); - bam_aux_append(bw->b, g_mdArg.DUPLICATE_SET_SIZE_TAG.c_str(), 'i', sizeof(duplicateSetSize), - (const uint8_t *)&duplicateSetSize); + if (!bw->IsSecondaryOrSupplementary() && !bw->GetReadUnmappedFlag()) { + cerr << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl; + uint8_t *oldTagVal = bam_aux_get(bw->b, g_mdArg.DUPLICATE_SET_INDEX_TAG.c_str()); + if (oldTagVal != NULL) + bam_aux_del(bw->b, oldTagVal); + bam_aux_append(bw->b, g_mdArg.DUPLICATE_SET_INDEX_TAG.c_str(), 'i', + sizeof(representativeReadIndexInFile), (uint8_t *)&representativeReadIndexInFile); + oldTagVal = bam_aux_get(bw->b, g_mdArg.DUPLICATE_SET_SIZE_TAG.c_str()); + if (oldTagVal != NULL) + bam_aux_del(bw->b, oldTagVal); + bam_aux_append(bw->b, g_mdArg.DUPLICATE_SET_SIZE_TAG.c_str(), 'i', sizeof(duplicateSetSize), + (const uint8_t *)&duplicateSetSize); + } } // 每个read都要写到output,只是添加标识,或者删掉这些冗余record ++bamIdx; @@ -355,12 +362,14 @@ int MarkDuplicates(int argc, char *argv[]) { bam_aux_append(bw->b, "PG", 'Z', g_mdArg.PROGRAM_RECORD_ID.size() + 1, (const uint8_t *)g_mdArg.PROGRAM_RECORD_ID.c_str()); } +#if 1 if (sam_write1(g_outBamFp, g_outBamHeader, bw->b) < 0) { Error("failed writing sam record to \"%s\"", g_gArg.out_fn.c_str()); sam_close(g_outBamFp); sam_close(g_inBamFp); return -1; } +#endif } inBuf.ClearAll(); cout << "write round time: " << tw1.seconds_elapsed() << " s" << endl; @@ -371,7 +380,7 @@ int MarkDuplicates(int argc, char *argv[]) { // 计算统计信息 gMetrics.READ_PAIRS_EXAMINED /= 2; gMetrics.READ_PAIR_DUPLICATES /= 2; - for (auto &arr : gData.opticalDupIdxArr) gMetrics.READ_PAIR_OPTICAL_DUPLICATES += arr.size(); + for (auto &arr : gPipe.intersectData.opticalDupIdxArr) gMetrics.READ_PAIR_OPTICAL_DUPLICATES += arr.size(); gMetrics.READ_PAIR_OPTICAL_DUPLICATES = gMetrics.READ_PAIR_OPTICAL_DUPLICATES / 2; gMetrics.ESTIMATED_LIBRARY_SIZE = estimateLibrarySize(gMetrics.READ_PAIRS_EXAMINED - gMetrics.READ_PAIR_OPTICAL_DUPLICATES, diff --git a/src/sam/markdups/md_funcs.cpp b/src/sam/markdups/md_funcs.cpp index d94ffc7..15e7736 100644 --- a/src/sam/markdups/md_funcs.cpp +++ b/src/sam/markdups/md_funcs.cpp @@ -87,7 +87,7 @@ void buildReadEnds(BamWrap &bw, int64_t index, ReadNameParser &rnParser, ReadEnd k.read2ReferenceIndex = bc.mtid; } // Fill in the location information for optical duplicates - rnParser.AddLocationInformation(bw.query_name(), pKey); + if (!rnParser.wrong_name_format) rnParser.AddLocationInformation(bw.query_name(), pKey); // cout << k.tile << ' ' << k.x << ' ' << k.y << endl; // 计算位置key k.posKey = diff --git a/src/sam/markdups/md_types.h b/src/sam/markdups/md_types.h index e4d061c..a7d2c54 100644 --- a/src/sam/markdups/md_types.h +++ b/src/sam/markdups/md_types.h @@ -93,11 +93,9 @@ struct TaskSeqDupInfo { DPSet dupIdx; MDSet opticalDupIdx; DPSet repIdx; - MDSet singletonIdx; MDSet notDupIdx; MDSet notOpticalDupIdx; MDSet notRepIdx; - MDSet notSingletonIdx; }; /* 保存有未匹配pair位点的信息,包括read end数组和有几个未匹配的read end */ @@ -171,59 +169,4 @@ struct GlobalDataArg { vector> latterNotOpticalDupIdxArr; vector> latterNotRepIdxArr; vector> latterNotSingletonIdxArr; -}; - -struct ReadEndsArrIdIdx { - int arrId = 0; - uint64_t arrIdx = 0; - const ReadEnds *pE = nullptr; -}; - -struct ReadEndsGreaterThan { - bool operator()(const ReadEndsArrIdIdx &a, const ReadEndsArrIdIdx &b) { return *b.pE < *a.pE; } -}; -struct ReadEndsQueue { - // 将冗余索引和他对应的task vector对应起来 - vector*> *arr2d; - priority_queue, ReadEndsGreaterThan> minHeap; - uint64_t popNum = 0; - - int Init(vector*> *_arr2d) { - arr2d = _arr2d; - if (arr2d == nullptr) { - return -1; - } - for (int i = 0; i < arr2d->size(); ++i) { - auto v = (*arr2d)[i]; - if (!v->empty()) { - minHeap.push({i, 1, &(*v)[0]}); - } - } - return 0; - } - - const ReadEnds *Pop() { - const ReadEnds *ret = nullptr; - if (!minHeap.empty()) { - auto idx = minHeap.top(); - minHeap.pop(); - ++popNum; - ret = idx.pE; - auto v = (*arr2d)[idx.arrId]; - if (v->size() > idx.arrIdx) { - minHeap.push({idx.arrId, idx.arrIdx + 1, &(*v)[idx.arrIdx]}); - } - } - return ret; - } - - uint64_t Size() { - uint64_t len = 0; - if (arr2d != nullptr) { - for (auto v : *arr2d) { - len += v->size(); - } - } - return len - popNum; - } }; \ No newline at end of file diff --git a/src/sam/markdups/pipeline_md.cpp b/src/sam/markdups/pipeline_md.cpp index a155730..e12009f 100644 --- a/src/sam/markdups/pipeline_md.cpp +++ b/src/sam/markdups/pipeline_md.cpp @@ -807,6 +807,7 @@ static void *pipeGenRE(void *data) { TWIST(pipeArg.genRESig, BY, 1); TWIST(pipeArg.readSig, BY, -1); // 使用了一次读入的数据 } + cout << "pipe gen reads" << endl; return 0; } static void *pipeSort(void *data) { @@ -851,6 +852,7 @@ static void *pipeSort(void *data) { pipeArg.sortOrder += 1; TWIST(pipeArg.sortSig, BY, 1); } + cout << "end pipe sort" << endl; return 0; } static void *pipeMarkDup(void *data) { @@ -879,11 +881,11 @@ static void *pipeMarkDup(void *data) { } POSSESS(pipeArg.markDupSig); pipeArg.markDupFinish = 1; - TWIST(pipeArg.markDupSig, BY, 1); + TWIST(pipeArg.markDupSig, BY, 2); break; } /* 冗余检测 readends */ - cout << "markdup order: " << pipeArg.markDupOrder << endl; + // cout << "markdup order: " << pipeArg.markDupOrder << endl; tm_arr[3].acc_start(); doMarkDup(pipeArg); tm_arr[3].acc_end(); @@ -894,6 +896,7 @@ static void *pipeMarkDup(void *data) { pipeArg.markDupOrder += 1; TWIST(pipeArg.markDupSig, BY, 1); } + cout << "end pipe markdup" << endl; return 0; } static void *pipeIntersect(void *data) { @@ -925,6 +928,7 @@ static void *pipeIntersect(void *data) { pipeArg.intersectOrder += 1; } + cout << "end pipe intersect" << endl; return 0; } @@ -1011,7 +1015,7 @@ static void mergeAllTask(PipelineArg &pipeArg) { static void parallelPipeline() { Timer::log_time("pipeline start"); - PipelineArg pipeArg; + PipelineArg &pipeArg = gPipe; pipeArg.numThread = g_gArg.num_threads; pthread_t tidArr[5]; @@ -1082,7 +1086,7 @@ void pipelineMarkDups() { if (g_gArg.num_threads > 1) return parallelPipeline(); Timer::log_time("pipeline start"); - PipelineArg pipeArg; + PipelineArg &pipeArg = gPipe; pipeArg.numThread = g_gArg.num_threads; BamBufType inBamBuf(g_gArg.use_asyncio); inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); diff --git a/src/sam/markdups/shared_args.h b/src/sam/markdups/shared_args.h index 9021e00..97d3935 100644 --- a/src/sam/markdups/shared_args.h +++ b/src/sam/markdups/shared_args.h @@ -26,8 +26,8 @@ class MarkDupsArg; extern MarkDupsArg &g_mdArg; // markduplicate程序相关的参数 // 上边加下划线表示所有模块共享,不加下划线表示markduplicate计算时候内部使用 -class GlobalDataArg; -extern GlobalDataArg &gData; // 计算冗余过程中保存在全局的数据 +class PipelineArg; +extern PipelineArg &gPipe; // 计算冗余过程中保存在全局的数据 class DuplicationMetrics; extern DuplicationMetrics &gMetrics; // 用来计算统计信息 diff --git a/src/sam/utils/read_ends.h b/src/sam/utils/read_ends.h index 638a074..c3a49ad 100644 --- a/src/sam/utils/read_ends.h +++ b/src/sam/utils/read_ends.h @@ -32,6 +32,8 @@ struct PhysicalLocation { int16_t tile = -1; int32_t x = -1; int32_t y = -1; + //int16_t x = -1; + //int16_t y = -1; }; /* 包含了所有read ends信息,如picard里边的 ReadEndsForMarkDuplicates*/ diff --git a/src/sam/utils/read_name_parser.h b/src/sam/utils/read_name_parser.h index a3f5b80..d4d1cc4 100644 --- a/src/sam/utils/read_name_parser.h +++ b/src/sam/utils/read_name_parser.h @@ -50,6 +50,8 @@ struct ReadNameParser { const string DEFAULT_READ_NAME_REGEX = "(?:.*:)?([0-9]+)[^:]*:([0-9]+)[^:]*:([0-9]+)[^:]*$"; + bool wrong_name_format = false; + string readNameStored = ""; PhysicalLocation physicalLocationStored; int tmpLocationFields[3]; // for optimization of addLocationInformation @@ -166,6 +168,7 @@ struct ReadNameParser { "%s; Error Msg: %s", readNameRegex.c_str(), readName.c_str(), e.what()); warnedAboutRegexNotMatching = false; + wrong_name_format = true; } } catch (...) { if (warnedAboutRegexNotMatching) { @@ -175,6 +178,7 @@ struct ReadNameParser { "%s", readNameRegex.c_str(), readName.c_str()); warnedAboutRegexNotMatching = false; + wrong_name_format = true; } }