From 36b1430da584a056874ef3e3062ef7e7d8b3d520 Mon Sep 17 00:00:00 2001 From: zzh Date: Tue, 28 Nov 2023 10:45:40 +0800 Subject: [PATCH] =?UTF-8?q?=E4=B8=B2=E8=A1=8C=E5=BE=97=E5=88=B0=E4=BA=86?= =?UTF-8?q?=E6=AD=A3=E7=A1=AE=E7=9A=84=E7=BB=93=E6=9E=9C=EF=BC=8C=E4=BD=86?= =?UTF-8?q?=E8=BF=98=E9=9C=80=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- build.sh | 4 +- out.bam | Bin 1634 -> 0 bytes run.sh | 10 +- src/common/hts/bam_buf.cpp | 3 +- src/common/hts/bam_buf.h | 39 ++- src/common/hts/bam_wrap.h | 4 +- src/common/utils/yarn.cpp | 2 +- src/sam/markdups/markdups.cpp | 618 ++++----------------------------- src/sam/markdups/md_funcs.h | 329 ++++++++++++++++++ src/sam/markdups/parallel_md.h | 540 ++++++++++++++++++++++++++++ src/sam/markdups/serial_md.h | 418 ++++++++++++++++++++++ src/sam/utils/read_ends.h | 42 ++- 12 files changed, 1441 insertions(+), 568 deletions(-) delete mode 100644 out.bam create mode 100644 src/sam/markdups/md_funcs.h create mode 100644 src/sam/markdups/parallel_md.h create mode 100644 src/sam/markdups/serial_md.h diff --git a/build.sh b/build.sh index e560501..b4dd51f 100755 --- a/build.sh +++ b/build.sh @@ -1,7 +1,7 @@ #!/bin/bash dir="/home/zzh/work/GeneKit/picard_cpp/build" -#[ -d "$dir" ] && rm -rf "$dir" -#mkdir "$dir" +[ -d "$dir" ] && rm -rf "$dir" +mkdir "$dir" cd "$dir" #cmake .. -DCMAKE_BUILD_TYPE=Debug cmake .. -DCMAKE_BUILD_TYPE=Release diff --git a/out.bam b/out.bam deleted file mode 100644 index d885569aeef875cea068c8501f1ecc8594c7b499..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1634 zcmV-o2A%mIiwFb&00000{{{d;LjnLr2BlVAh?G?rK0BK~Glmp&QBW~NHzBig{(m-w z)s-yR-8NUmm6d7Snf+^LM`yoXb&Eg@ zidbbVl|xmG3PYHdGK;2TluIVL4d_slpo9`;)61&BG0cQuT=C2?At{Wp6eP=y&;({$ z5N$*TC-o7lX|&QAoD_%^<3{kT6p~bksbGYeEI$&LXfP98iL4e9m#7Ibf|9O_xJ0F~ zQj~Y&#UxP=x-6)|qC1iCB$%CY zPIY)!tSf~y4%#tcv#ZrfM zfRqQ6NGWd5L6CbIi%G(R47qX?r$H?497=nFJc<}+rHB$iEaoU~!dPjen6?)tpj=&A zU>d~AtPBaQbV?Z&_K*amSE^v3S~f&KM74xeS@0VJw#hlP0Xo9eJU_rKWjiad4Ca_9NCRp;YQ#4J{z8EY#^;A2~J}5Z2Vri0uDMi9d>*Cob)dY28 zccJYg6}}L}#gl~az>eaK)4>6^&$;6`4G$!7x!5}>WzZLw3%uDu?Zov(*-}HFGUKQT z?L@{29*!vE?12b+MqDn+knq}5o^jHI^K-Rp`(TTpJ{M2h&q4S^QpZUx!gx7OhKJM5 zK`Z;63gTtC*ykiXgRTbq28DB{X`XgiM6tjw&BcBadAEjS7MW_q zIDAHx;g;FCqxj7B{%EPy?i9y{Q(<(de|CO;sbyE1;?On+?&#lts9qWvjV-KL936@O z>>h;s?3qxNj{OqrN@KPY9VreD0aA-_9p5Ip_OTSx$Bd3s@p@STwZKpe)OkwwLD0ky_lj zzWw!9x!PzhR~I9@nwEAWgbMaY(3Zg;H-bdb@mnv)5<`!ky0HNyjvCh*u_Vyu*RPd8 zlIY0-izS7o2VW|Jq|x&3E9*fr==K+L>wCd7MrR&;AQ1`T#d{NxqT6$K*2N-2yZ8RI zHWoR0eC<>s3N&!^;Y5^Z>nHoy#G*pifB*KLSk!1>--Sdp==A1ey&zJI4qU#Phy+bf z|6GVgik83I*b9m6L91J~#>tMOySHDBHHUYhD+35^uwr-dLg(+xL$Gn7)qW4M4-ool zI1j;wh3?GdA=s?YWAmOC7?xl0AbX+FrDI+a;ENy6L$Eoar{3@&Fx+$^2gREbQg3>` z>^p;2PI`t=Q0bHhL0(gDdA>la@V2LAzd2~`j0f4L3jOUtu<@YYcf2I*!-Ot)5NtST z?5qcY*4G{c8x4A5)r$ySz45+h2wLBG5Ns~!;s+iCof!VmgCJt>CC?YQ|Hy}+tD8Rd zAn>*1L9mISvs-%dErd?oto7uz0J?w9(}Gm5p37^&qJftG@E|aJ{;xa)YX$oAp+a5@ ze66YFAy^{N)m9#Y-2uJ0U)Og~k2Z8(ZuNCsygZq=GJuS$7Y1M;ZLTlgk zAUM7~?|G>L^v{Q$7P!CrnU^Y*ed}`%f>eL`(#s2c9k}lKf@%Nyy=Mr}Ef0c^Hahcz z=L_8LyXir362E!AAmUBko7V#O7x%75B@1QWVN^X=Kdata, bw->b->data, bw->b->l_data); // 更新下次存储的位置 mem_offset = (mem_offset + bw->length() + 8 - 1) & ~((size_t)(8 - 1)); - - // cout << "size: " << bv.size() << " " << buf_name << endl; bv.push_back(bwp); } @@ -192,6 +190,7 @@ int AsyncIoBamBuf::ReadBam() { if (use_async_io_) { + hasThread = true; return async_read_bam(); } else diff --git a/src/common/hts/bam_buf.h b/src/common/hts/bam_buf.h index a3e3c17..61c0934 100644 --- a/src/common/hts/bam_buf.h +++ b/src/common/hts/bam_buf.h @@ -70,6 +70,20 @@ struct BamBuf free(bw); } } + void FreeMemory() // 释放开辟的内存 + { + if (this->mem != nullptr) + { + free(this->mem); + } + if (this->bw != nullptr) + { + bam_destroy1(bw->b); + free(bw); + } + this->mem = nullptr; + this->bw = nullptr; + } void prepare_read(); // 检查缓存是否还有空间 bool has_enough_space(); @@ -95,6 +109,7 @@ struct AsyncIoBamBuf BamBuf *pi_; // 当前用的buf BamBuf *po_; // 后台在读取的buf pthread_t *tid_ = NULL; + bool hasThread = false; int64_t legacy_size_ = 0; // 上一轮运算之后,缓存中还剩余的上次读取的read数量 bool first_read_ = true; int last_read_num_ = 0; // 上一次读取了多少reads @@ -112,7 +127,8 @@ struct AsyncIoBamBuf { if (tid_ != NULL) { - pthread_join(*tid_, 0); + if (hasThread) + pthread_join(*tid_, 0); free(tid_); } // 其他的内存就等程序结束自动释放 @@ -128,6 +144,7 @@ struct AsyncIoBamBuf int ReadBam(); // 为下一次读取做准备, 计算一些边界条件 void ClearBeforeIdx(size_t idxInBv); + vector &GetBamArr() { return bam_arr_; } // 获取bam array // 清空上一次所有读入的数据 void ClearAll(); // 包含的read数量 @@ -146,6 +163,11 @@ struct AsyncIoBamBuf } return std::move(vector()); } + void FreeMemory() + { + buf1_.FreeMemory(); + buf2_.FreeMemory(); + } // 同步读取 int sync_read_bam(); @@ -156,16 +178,13 @@ struct AsyncIoBamBuf void resize_buf(); inline void refresh_bam_arr() { - if (this->Size() != bam_arr_.size()) + bam_arr_.resize(this->Size()); + for (int i = 0; i < bam_arr_.size(); ++i) { - bam_arr_.resize(this->Size()); - for (int i = 0; i < bam_arr_.size(); ++i) - { - if (i < legacy_size_) - bam_arr_[i] = (*po_)[i]; - else - bam_arr_[i] = (*pi_)[i - legacy_size_]; - } + if (i < legacy_size_) + bam_arr_[i] = (*po_)[i]; + else + bam_arr_[i] = (*pi_)[i - legacy_size_]; } } }; diff --git a/src/common/hts/bam_wrap.h b/src/common/hts/bam_wrap.h index a1b8fcc..d338256 100644 --- a/src/common/hts/bam_wrap.h +++ b/src/common/hts/bam_wrap.h @@ -297,7 +297,7 @@ struct BamWrap int64_t end_pos = bam_endpos(b); const uint32_t *cigar = bam_get_cigar(b); const bam1_core_t &bc = b->core; - for (int i = bc.n_cigar; i >= 0; --i) + for (int i = bc.n_cigar - 1; i >= 0; --i) { const char c = bam_cigar_opchr(cigar[i]); const int len = bam_cigar_oplen(cigar[i]); @@ -306,7 +306,7 @@ struct BamWrap else break; } - return end_pos; + return end_pos - 1; } /* 获取碱基质量分数的加和 */ diff --git a/src/common/utils/yarn.cpp b/src/common/utils/yarn.cpp index c7c9b6a..09c24b1 100644 --- a/src/common/utils/yarn.cpp +++ b/src/common/utils/yarn.cpp @@ -96,7 +96,7 @@ local void fail(int err, char const *file, long line, char const *func) { typedef void *(*malloc_t)(size_t); typedef void (*free_t)(void *); local malloc_t my_malloc_f = malloc; -local free_t my_free = free; +local free_t my_free = free; // Use user-supplied allocation routines instead of malloc() and free(). void yarn_mem(malloc_t lease, free_t vacate) { diff --git a/src/sam/markdups/markdups.cpp b/src/sam/markdups/markdups.cpp index fc29615..3d8ebbc 100644 --- a/src/sam/markdups/markdups.cpp +++ b/src/sam/markdups/markdups.cpp @@ -28,12 +28,12 @@ Date : 2023/10/23 #include #include #include - - +#include using namespace std; using std::cout; + #define SMA_TAG_PG "PG" #define BAM_BLOCK_SIZE 2 * 1024 * 1024 @@ -41,437 +41,11 @@ using std::cout; static Timer tm_arr[10]; // 用来测试性能 -/* 前向声明 */ -class ThMarkDupArg; +#include "md_funcs.h" +#include "serial_md.h" +#include "parallel_md.h" -/* 全局本地变量 */ -static queue g_qpThMarkDupArg; // 存放线程变量的队列 -static lock_t *g_queueFirstLock = NEW_LOCK(-1); // 队列的第一个任务是否完成 -static lock_t *g_readyToReadLock = NEW_LOCK(-1); // 通知主线程是否可以进行下一次读取 -static vector g_vRnParser; // 每个线程一个read name parser -static int g_numDuplicateIndices = 0; // 找到的冗余read总数 -static samFile *g_outBamFp = nullptr; // 输出文件, sam或者bam格式 -static sam_hdr_t *g_outBamHeader; // 输出文件的header -static int g_maxJobNum = 0; // 每次读取新的数据后,新增的任务数量 -static int g_jobNumForRead = 0; // 任务数量降到当前值时开始下一轮读取 -static volatile int64_t g_bamLoadedNum = 0; // 已经读入的read总数 -static volatile int64_t g_bamWritenNum = 0; // 已经处理完,写入输出文件的read总数 -static vector g_vDupIdx; // 线程内部计算得出的 -static vector g_vOpticalDupIdx; -static set g_sDupIdxLatter; -static set g_sOpticalDupIdxLatter; -/* 参数对象作为全局对象,免得多次作为参数传入函数中 */ -static GlobalArg &g_gArg = GlobalArg::Instance(); -static MarkDupsArg g_mdArg; - -/* - * 计算read的分数 - */ -static int16_t computeDuplicateScore(BamWrap &bw) -{ - int16_t score = 0; - switch (g_mdArg.DUPLICATE_SCORING_STRATEGY) - { - case ns_md::SUM_OF_BASE_QUALITIES: - // two (very) long reads worth of high-quality bases can go over Short.MAX_VALUE/2 - // and risk overflow. - score += (int16_t)min(bw.GetSumOfBaseQualities(), INT16_MAX / 2); - break; - case ns_md::TOTAL_MAPPED_REFERENCE_LENGTH: - if (!bw.GetReadUnmappedFlag()) - // no need to remember the score since this scoring mechanism is symmetric - score = (int16_t)min(bw.GetReferenceLength(), INT16_MAX / 2); - break; - case ns_md::RANDOM: - // The RANDOM score gives the same score to both reads so that they get filtered together. - // it's not critical do use the readName since the scores from both ends get added, but it seem - // to be clearer this way. - score += (short)(Murmur3::Instance().HashUnencodedChars(bw.query_name()) & 0b11111111111111); - // subtract Short.MIN_VALUE/4 from it to end up with a number between - // 0 and Short.MAX_VALUE/2. This number can be then discounted in case the read is - // not passing filters. We need to stay far from overflow so that when we add the two - // scores from the two read mates we do not overflow since that could cause us to chose a - // failing read-pair instead of a passing one. - score -= INT16_MIN / 4; - default: - break; - } - // make sure that filter-failing records are heavily discounted. (the discount can happen twice, once - // for each mate, so need to make sure we do not subtract more than Short.MIN_VALUE overall.) - score += bw.GetReadFailsVendorQualityCheckFlag() ? (int16_t)(INT16_MIN / 2) : 0; - - return score; -} - -/* - * Builds a read ends object that represents a single read. 用来表示一个read的特征结构 - */ -static void buildReadEnds(BamWrap &bw, int64_t index, ReadNameParser &rnParser, ReadEnds *pKey) -{ - auto &k = *pKey; - auto &bc = bw.b->core; - k.read1ReferenceIndex = bc.tid; - k.read1Coordinate = (bc.flag & BAM_FREVERSE) ? bw.GetUnclippedEnd() : bw.GetUnclippedStart(); - k.orientation = (bc.flag & BAM_FREVERSE) ? ReadEnds::R : ReadEnds::F; - k.read1IndexInFile = index; - k.score = computeDuplicateScore(bw); - // Doing this lets the ends object know that it's part of a pair - if (bw.GetReadPairedFlag() && !bw.GetMateUnmappedFlag()) - { - k.read2ReferenceIndex = bc.mtid; - } - // Fill in the location information for optical duplicates - rnParser.AddLocationInformation(bw.query_name(), pKey); - // cout << k.tile << ' ' << k.x << ' ' << k.y << endl; - // 计算位置key - k.posKey = BamWrap::bam_global_pos(k.read1ReferenceIndex, k.read1Coordinate); // << 1 | k.orientation; -} - -/** - * Takes a list of ReadEndsForMarkDuplicates objects and identify the representative read based on - * quality score. For all members of the duplicate set, add the read1 index-in-file of the representative - * read to the records of the first and second in a pair. This value becomes is used for - * the 'DI' tag. - */ -static void addRepresentativeReadIndex(vector &vpRe) -{ - -} - -/* 处理一组pairend的readends,标记冗余 */ -static void markDuplicatePairs(vector &vpRe, set *psDupIdx, set *psOpticalDupIdx) -{ - if (vpRe.size() < 2) { - if (vpRe.size() == 1) - { - // addSingletonToCount(libraryIdGenerator); - } - return; - } - - int maxScore = 0; - ReadEnds *pBestRe = nullptr; - /** All read ends should have orientation FF, FR, RF, or RR **/ - for (auto pe: vpRe) // 找分数最高的readend - { - if (pe->score > maxScore || pBestRe == nullptr) - { - maxScore = pe->score; - pBestRe = pe; - } - } - if (!g_mdArg.READ_NAME_REGEX.empty()) // 检查光学冗余 - { - // trackOpticalDuplicates - } - - for (auto pe: vpRe) // 对非best read标记冗余 - { - if (pe != pBestRe) // 非best - { - psDupIdx->insert(pe->read1IndexInFile); // 添加read1 - if (pe->read2IndexInFile != pe->read1IndexInFile) - psDupIdx->insert(pe->read2IndexInFile); // 添加read2 - } - } - - if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS) - { - addRepresentativeReadIndex(vpRe); - } -} - -/* 处理一组非paired的readends,标记冗余 */ -static void markDuplicateFragments(vector &vpRe, - bool containsPairs, - set *psDupIdx, - set *psOpticalDupIdx) -{ - if (containsPairs) - { - for (auto pe: vpRe) - { - if (!pe->IsPaired()) - { - psDupIdx->insert(pe->read1IndexInFile); - } - } - } - else - { - int maxScore = 0; - ReadEnds *pBest = nullptr; - for (auto pe : vpRe) - { - if (pe->score > maxScore || pBest == nullptr) - { - maxScore = pe->score; - pBest = pe; - } - } - - for (auto pe : vpRe) - { - if (pe != pBest) - { - psDupIdx->insert(pe->read1IndexInFile); - } - } - } -} - -/* 多线程处理冗余参数结构体 */ -struct ThMarkDupArg -{ - int64_t bamStartIdx; // 当前vBam数组中第一个bam记录在整体bam中所处的位置 - long seq; // 当前任务在所有任务的排序 - bool more; // 后面还有任务 - volatile bool finish; // 当前任务有没有处理完 - vector vBam; // 存放待处理的bam read - map> mvPair; // 以冗余位置为索引,保存所有pairend reads - map> mvFrag; // 保存所有reads,包括pairend - map> msDupIdx; // 冗余read的索引 - map> msOpticalDupIdx; // optical冗余read的索引 - unordered_map umReadEnds; // 用来寻找pair end -}; - -/* - * 多线程查找和标记冗余函数 - */ -void thread_markdups(void *arg, int tid) -{ - auto &p = *(ThMarkDupArg *)arg; - - /* 处理每个read,创建ReadEnd,并放入frag和pair中 */ - for (int i = 0; i < p.vBam.size(); ++i) // 循环处理每个read - { - BamWrap *bw = p.vBam[i]; - const int64_t bamIdx = p.bamStartIdx + i; - if (bw->GetReadUnmappedFlag()) - { - if (bw->b->core.tid == -1) - // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). - break; - } - else if (!bw->IsSecondaryOrSupplementary()) // 是主要比对 - { - ReadEnds fragEnd; - buildReadEnds(*bw, bamIdx, g_vRnParser[tid], &fragEnd); - p.mvFrag[fragEnd.posKey].push_back(fragEnd); // 添加进frag集合 - if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) // 是pairend而且互补的read也比对上了 - { - string key = bw->query_name(); - if (p.umReadEnds.find(key) == p.umReadEnds.end()) - { - p.umReadEnds[key] = fragEnd; - } - else // 找到了pairend - { - auto pairedEnds = p.umReadEnds.at(key); - p.umReadEnds.erase(key); // 删除找到的pairend - const int matesRefIndex = fragEnd.read1ReferenceIndex; - const int matesCoordinate = fragEnd.read1Coordinate; - // Set orientationForOpticalDuplicates, which always goes by the first then the second end for the strands. NB: must do this - // before updating the orientation later. - if (bw->GetFirstOfPairFlag()) - { - pairedEnds.orientationForOpticalDuplicates = - ReadEnds::GetOrientationByte(bw->GetReadNegativeStrandFlag(), pairedEnds.orientation == ReadEnds::R); - } - else - { - pairedEnds.orientationForOpticalDuplicates = - ReadEnds::GetOrientationByte(pairedEnds.orientation == ReadEnds::R, bw->GetReadNegativeStrandFlag()); - } - // If the other read is actually later, simply add the other read's data as read2, else flip the reads - if (matesRefIndex > pairedEnds.read1ReferenceIndex || - (matesRefIndex == pairedEnds.read1ReferenceIndex && matesCoordinate >= pairedEnds.read1Coordinate)) - { - pairedEnds.read2ReferenceIndex = matesRefIndex; - pairedEnds.read2Coordinate = matesCoordinate; - pairedEnds.read2IndexInFile = bamIdx; - pairedEnds.orientation = ReadEnds::GetOrientationByte(pairedEnds.orientation == ReadEnds::R, - bw->GetReadNegativeStrandFlag()); - - // if the two read ends are in the same position, pointing in opposite directions, - // the orientation is undefined and the procedure above - // will depend on the order of the reads in the file. - // To avoid this, we set it explicitly (to FR): - if (pairedEnds.read2ReferenceIndex == pairedEnds.read1ReferenceIndex && - pairedEnds.read2Coordinate == pairedEnds.read1Coordinate && - pairedEnds.orientation == ReadEnds::RF) - { - pairedEnds.orientation = ReadEnds::FR; - } - } - else - { - pairedEnds.read2ReferenceIndex = pairedEnds.read1ReferenceIndex; - pairedEnds.read2Coordinate = pairedEnds.read1Coordinate; - pairedEnds.read2IndexInFile = pairedEnds.read1IndexInFile; - pairedEnds.read1ReferenceIndex = matesRefIndex; - pairedEnds.read1Coordinate = matesCoordinate; - pairedEnds.read1IndexInFile = bamIdx; - pairedEnds.orientation = ReadEnds::GetOrientationByte(bw->GetReadNegativeStrandFlag(), - pairedEnds.orientation == ReadEnds::R); - } - - pairedEnds.score += computeDuplicateScore(*bw); - p.mvPair[pairedEnds.posKey].push_back(pairedEnds); - } - } - } - } - /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ - // 先处理 pair - int dupNum = 0; - vector vRePotentialDup; // 有可能是冗余的reads - for (auto &e : p.mvPair) // 按比对的位置先后进行遍历 - { - if (e.second.size() > 1) // 有潜在的冗余 - { - vRePotentialDup.clear(); - ReadEnds *pReadEnd = nullptr; - for (auto &re : e.second) - { - if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) - vRePotentialDup.push_back(&re); - else - { - markDuplicatePairs(vRePotentialDup, &p.msDupIdx[e.first], &p.msOpticalDupIdx[e.first]); - vRePotentialDup.clear(); - vRePotentialDup.push_back(&re); - pReadEnd = &re; - } - } - markDuplicatePairs(vRePotentialDup, &p.msDupIdx[e.first], &p.msOpticalDupIdx[e.first]); - } - } - // 再处理frag - bool containsPairs = false; - bool containsFrags = false; - for (auto &e : p.mvFrag) - { - if (e.second.size() > 1) // 有潜在的冗余 - { - vRePotentialDup.clear(); - ReadEnds *pReadEnd = nullptr; - for (auto &re : e.second) - { - if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, false)) - { - vRePotentialDup.push_back(&re); - containsPairs = containsPairs || re.IsPaired(); - containsFrags = containsFrags || !re.IsPaired(); - } - else - { - if (vRePotentialDup.size() > 1 && containsFrags) - { - markDuplicateFragments(vRePotentialDup, containsPairs, &p.msDupIdx[e.first], &p.msOpticalDupIdx[e.first]); - } - vRePotentialDup.clear(); - vRePotentialDup.push_back(&re); - pReadEnd = &re; - containsPairs = re.IsPaired(); - containsFrags = !re.IsPaired(); - } - } - if (vRePotentialDup.size() > 1 && containsFrags) { - markDuplicateFragments(vRePotentialDup, containsPairs, &p.msDupIdx[e.first], &p.msOpticalDupIdx[e.first]); - } - } - } - - // cout << tid << '\t' << "dup: " << dupNum << endl; - // cout << tid << " all: no: " << p.vBam.size() << '\t' << p.umReadEnds.size() << endl; - /* 本段数据处理完成,告诉输出线程 */ - POSSESS(g_queueFirstLock); - p.finish = true; - // cout << tid << ": process: " << p.seq << endl; - auto front = g_qpThMarkDupArg.front(); - if (front->finish) - { - TWIST(g_queueFirstLock, TO, front->seq); // 通知写线程,当前队列头部完成的任务 - } else { - RELEASE(g_queueFirstLock); - } -} - -/* - * 多线程将结果写入文件,写之前需要合并相邻线程的未处理的结果 - */ -void thread_write(void *) -{ - bool more = false; - long seq = 0; - long unPairedNum = 0; - POSSESS(g_queueFirstLock); - WAIT_FOR(g_queueFirstLock, TO_BE, seq++); // 等待首个任务完成 - auto lastP = g_qpThMarkDupArg.front(); // 取队首的数据 - auto umUnpairedReadEnds = lastP->umReadEnds; // 还未找到pair的read - auto p = lastP; - g_qpThMarkDupArg.pop(); // 删除队首 - TWIST(g_queueFirstLock, TO, seq); // 解锁 - more = lastP->more; // 是否还有下一个任务 - while (more) // 循环处理,将结果写入文件 - { - POSSESS(g_queueFirstLock); - if (g_qpThMarkDupArg.empty()) // 有可能新任务没来得及添加进队列 - { - RELEASE(g_queueFirstLock); - continue; - } - WAIT_FOR(g_queueFirstLock, TO_BE, seq); // 等待任务完成 - p = g_qpThMarkDupArg.front(); - if (!p->finish) // 有可能这个任务没有完成,是下边那个TWIST导致进到这里,因为这一段代码可能运行比较快 - { - TWIST(g_queueFirstLock, TO, -1); // 此时队首任务没完成,-1可以让锁无法进入到这里,避免无效获得锁 - continue; - } - g_qpThMarkDupArg.pop(); - TWIST(g_queueFirstLock, TO, seq + 1); - /* 处理结果数据 */ - // cout << "finish: " << seq - 1 << '\t' << "lastIdx: " << p->bamStartIdx+p->vBam.size() << endl; - - for (auto &e : p->umReadEnds) // 在当前任务中找有没有与上一个任务中没匹配的read,相匹配的pair - { - if (umUnpairedReadEnds.find(e.first) != umUnpairedReadEnds.end()) - umUnpairedReadEnds.erase(e.first); // 找到了pair - else - umUnpairedReadEnds.insert(e); // 没有pair,则添加 - } - - /* 更新写入read数量和状态 */ - POSSESS(g_readyToReadLock); - g_bamWritenNum += lastP->vBam.size(); - // cout << "write: " << g_qpThMarkDupArg.size() << endl; - if (g_qpThMarkDupArg.size() <= g_jobNumForRead) - { - TWIST(g_readyToReadLock, TO, 1); - } - else - { - RELEASE(g_readyToReadLock); - } - /* 准备下一轮循环 */ - delete lastP; - more = p->more; - lastP = p; - seq++; - } - unPairedNum = umUnpairedReadEnds.size(); - - cout << "Finally unpaired read num: " << unPairedNum << endl; - - // 处理最后一个数据 - POSSESS(g_readyToReadLock); - g_bamWritenNum += lastP->vBam.size(); - TWIST(g_readyToReadLock, TO, 1); - // cout << "last finish: " << seq - 1 << endl; - pthread_exit(0); -} /* * mark duplicate 入口,假定bam是按照比对后的坐标排序的,同一个样本的话不需要考虑barcode的问题 @@ -492,148 +66,110 @@ int MarkDuplicates(int argc, char *argv[]) parser.SetReadNameRegex(g_mdArg.READ_NAME_REGEX); // 用来解析read name中的tile,x,y信息 /* 打开输入bam文件 */ - sam_hdr_t *inBamHeader; - samFile *inBamFp; - inBamFp = sam_open_format(g_gArg.in_fn.c_str(), "r", nullptr); - if (!inBamFp) + g_inBamFp = sam_open_format(g_gArg.in_fn.c_str(), "r", nullptr); + if (!g_inBamFp) { Error("[%s] load sam/bam file failed.\n", __func__); return -1; } - hts_set_opt(inBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); - inBamHeader = sam_hdr_read(inBamFp); // 读取header + hts_set_opt(g_inBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); + g_inBamHeader = sam_hdr_read(g_inBamFp); // 读取header /* 利用线程池对输入输出文件进行读写 */ htsThreadPool htsPoolRead = {NULL, 0}; // 多线程读取,创建线程池 htsThreadPool htsPoolWrite = {NULL, 0}; // 读写用不同的线程池 - htsPoolRead.pool = hts_tpool_init(g_gArg.num_threads); + // htsPoolRead.pool = hts_tpool_init(g_gArg.num_threads); + htsPoolRead.pool = hts_tpool_init(16); htsPoolWrite.pool = hts_tpool_init(g_gArg.num_threads); if (!htsPoolRead.pool || !htsPoolWrite.pool) { Error("[%d] failed to set up thread pool", __LINE__); + sam_close(g_inBamFp); return -1; } - hts_set_opt(inBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); + hts_set_opt(g_inBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); /* 初始化输出文件 */ char modeout[12] = "wb"; sam_open_mode(modeout + 1, g_gArg.out_fn.c_str(), NULL); g_outBamFp = sam_open(g_gArg.out_fn.c_str(), modeout); - g_outBamHeader = sam_hdr_dup(inBamHeader); + g_outBamHeader = sam_hdr_dup(g_inBamHeader); + hts_set_opt(g_outBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); + hts_set_opt(g_outBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite); // 用同样的线程池处理输出文件 + + + /* 冗余检查和标记 */ + if (g_gArg.num_threads == 1) + { + serialMarkDups(); // 串行运行 + } + else + { + parallelMarkDups(); // 并行运行 + } + + /* 标记冗余, 将处理后的结果写入文件 */ + sam_close(g_inBamFp); // 重新打开bam文件 + g_inBamFp = sam_open_format(g_gArg.in_fn.c_str(), "r", nullptr); + if (!g_inBamFp) + { + Error("[%s] load sam/bam file failed.\n", __func__); + return -1; + } + hts_set_opt(g_inBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); + hts_set_opt(g_inBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead); + g_inBamHeader = sam_hdr_read(g_inBamFp); // 读取header if (sam_hdr_write(g_outBamFp, g_outBamHeader) != 0) { Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str()); sam_close(g_outBamFp); + sam_close(g_inBamFp); return -1; } - hts_set_opt(g_outBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE); - hts_set_opt(g_outBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite); // 用同样的线程池处理输出文件 - - // /* 读取缓存初始化 */ - BamBufType inBamBuf(g_gArg.use_asyncio); - inBamBuf.Init(inBamFp, inBamHeader, g_gArg.max_mem); - - /* 循环读入信息,并处理 */ - g_maxJobNum = g_gArg.num_threads * 10; - // g_maxJobNum = g_gArg.num_threads * 3; - g_jobNumForRead = g_gArg.num_threads * 2; - - int64_t x_all = 0; // for test - int64_t jobSeq = 0; - int64_t processedBamNum = 0; // 记录每个轮次累计处理的reads数量,用来计算每个read在整个文件中的索引位置 - threadpool thpool = thpool_init(g_gArg.num_threads); // 创建mark dup所需的线程池 - thread *writeth = LAUNCH(thread_write, nullptr); // 启动处理结果的的线程 - int bamRemainSize = 0; // 上一轮还剩下的bam数量,包含已经在任务里的和没有放进任务的 - int numReadsForEachJob = 0; // 每个线程处理的read数量,第一次读取的时候进行设置 - int lastRoundUnProcessed = 0; // 上一轮没有放进任务里的read数量 - int curRoundProcessed = 0; // 这一轮放进任务的read数量 - while (inBamBuf.ReadStat() >= 0) + // 输出index文件 + string indexFn = g_gArg.out_fn + ".csi"; // 现在索引都是csi格式的 + if (sam_idx_init(g_outBamFp, g_outBamHeader, 14 /*csi*/, indexFn.c_str()) < 0) { - /* 读取bam文件中的read */ - int readNum = inBamBuf.ReadBam(); - if (numReadsForEachJob == 0) - numReadsForEachJob = readNum / g_maxJobNum; // 第一次读取bam的时候进行设置 - g_bamLoadedNum += readNum; - - cout << readNum << endl; // 这一轮读取的bam数量 - - /* 多线程处理 任务数是线程数的10倍 */ - tm_arr[0].acc_start(); - curRoundProcessed = 0; // 当前轮次已经处理的reads数量 - int numNeedToProcess = inBamBuf.Size() - bamRemainSize + lastRoundUnProcessed; // 当前需要处理的bam数量 - for (int i = 0; numNeedToProcess >= numReadsForEachJob; ++i) // 只有待处理的reads数量大于一次任务的数量时,新建任务 - { - int startIdx = i * numReadsForEachJob + bamRemainSize - lastRoundUnProcessed; - int endIdx = (i + 1) * numReadsForEachJob + bamRemainSize - lastRoundUnProcessed; - - ThMarkDupArg *thArg = new ThMarkDupArg({processedBamNum + curRoundProcessed, - jobSeq++, - true, - false, - inBamBuf.Slice(startIdx, endIdx)}); - POSSESS(g_queueFirstLock); // 加锁 - g_qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列 - RELEASE(g_queueFirstLock); // 解锁 - thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务 - curRoundProcessed += endIdx - startIdx; - numNeedToProcess -= numReadsForEachJob; - } - processedBamNum += curRoundProcessed; - lastRoundUnProcessed = numNeedToProcess; - - /* 等待可以继续读取的信号 */ - POSSESS(g_readyToReadLock); - WAIT_FOR(g_readyToReadLock, TO_BE, 1); - bamRemainSize = g_bamLoadedNum - g_bamWritenNum; - - while (bamRemainSize >= inBamBuf.Size() / 2) - { // 要保留的多于现在有的bam数量的一半,那就等待write线程继续处理 - TWIST(g_readyToReadLock, TO, 0); - POSSESS(g_readyToReadLock); - WAIT_FOR(g_readyToReadLock, TO_BE, 1); - bamRemainSize = g_bamLoadedNum - g_bamWritenNum; - } - inBamBuf.ClearBeforeIdx(inBamBuf.Size() - bamRemainSize); // 清理掉已经处理完的reads - // cout << g_bamLoadedNum << '\t' << g_bamWritenNum << '\t' << bamRemainSize << '\t' << inBamBuf.Size() << endl; - TWIST(g_readyToReadLock, TO, 0); - + Error("failed to open index \"%s\" for writing", indexFn.c_str()); + sam_close(g_outBamFp); + sam_close(g_inBamFp); + return -1; } - /* 数据读完了,放一个空的任务,好让write thread停下来 */ - ThMarkDupArg *thArg = nullptr; - if (lastRoundUnProcessed > 0) // 最后一轮还有没有添加进任务的read数据 - { - thArg = new ThMarkDupArg({processedBamNum + curRoundProcessed, jobSeq++, false, false, - inBamBuf.Slice(inBamBuf.Size() - lastRoundUnProcessed, inBamBuf.Size())}); - processedBamNum += lastRoundUnProcessed; - } - else - { - thArg = new ThMarkDupArg({0, jobSeq++, false, false}); - } - POSSESS(g_queueFirstLock); // 加锁 - g_qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列 - RELEASE(g_queueFirstLock); // 解锁 - thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务 - - /* 同步所有线程 */ - thpool_wait(thpool); - thpool_destroy(thpool); - JOIN(writeth); - - cout <<"x_all: " << x_all << endl; - cout << "loaded: " << g_bamLoadedNum << endl; - cout << "writen: " << g_bamWritenNum << endl; - cout << "processedBamNum: " << processedBamNum << endl; - /* 标记冗余, 将处理后的结果写入文件 */ + // 读取输入文件 + // BamBufType inBuf(false); // inBuf(g_gArg.use_asyncio); + BamBufType inBuf(g_gArg.use_asyncio); + inBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); +// while(inBuf.ReadStat() >= 0) +// { +// size_t readNum = inBuf.ReadBam(); +// cout << "read: " << readNum << endl; +// for (size_t i = 0; i < inBuf.Size(); ++i) +// { +// /* 判断是否冗余 */ +// if (sam_write1(g_outBamFp, g_outBamHeader, inBuf[i]->b) < 0) +// { +// Error("failed writing header to \"%s\"", g_gArg.out_fn.c_str()); +// sam_close(g_outBamFp); +// sam_close(g_inBamFp); +// return -1; +// } +// } +// inBuf.ClearAll(); +// } +// if (sam_idx_save(g_outBamFp) < 0) +// { +// Error("writing index failed"); +// sam_close(g_outBamFp); +// sam_close(g_inBamFp); +// return -1; +// } /* 关闭文件,收尾清理 */ sam_close(g_outBamFp); - sam_close(inBamFp); - - cout << "read ends size: " << sizeof(ReadEnds) << endl; + sam_close(g_inBamFp); cout << " 总时间: " << time_all.seconds_elapsed() << endl; - cout << "计算read end: " << tm_arr[0].acc_seconds_elapsed() << endl; +// cout << "计算read end: " << tm_arr[0].acc_seconds_elapsed() << endl; Timer::log_time("程序结束"); return 0; } \ No newline at end of file diff --git a/src/sam/markdups/md_funcs.h b/src/sam/markdups/md_funcs.h new file mode 100644 index 0000000..b6c06e2 --- /dev/null +++ b/src/sam/markdups/md_funcs.h @@ -0,0 +1,329 @@ +/* 前向声明 */ +class ThMarkDupArg; + +/* 全局本地变量 */ +static queue g_qpThMarkDupArg; // 存放线程变量的队列 +static lock_t *g_queueFirstLock = NEW_LOCK(-1); // 队列的第一个任务是否完成 +static lock_t *g_readyToReadLock = NEW_LOCK(-1); // 通知主线程是否可以进行下一次读取 +static vector g_vRnParser; // 每个线程一个read name parser +static int g_numDuplicateIndices = 0; // 找到的冗余read总数 +static samFile *g_inBamFp; // 输入的bam文件 +static sam_hdr_t *g_inBamHeader; // 输入的bam文件头信息 +static samFile *g_outBamFp = nullptr; // 输出文件, sam或者bam格式 +static sam_hdr_t *g_outBamHeader; // 输出文件的header +static int g_maxJobNum = 0; // 每次读取新的数据后,新增的任务数量 +static int g_jobNumForRead = 0; // 任务数量降到当前值时开始下一轮读取 +static volatile int64_t g_bamLoadedNum = 0; // 已经读入的read总数 +static volatile int64_t g_bamUsedNum = 0; // 已经处理完,写入输出文件的read总数 +static vector g_vDupIdx; // 线程内部计算得出的 +static vector g_vOpticalDupIdx; +static set g_sDupIdxLatter; +static set g_sOpticalDupIdxLatter; +static bool g_serialProcess = false; // 是否串行运行 + +/* 参数对象作为全局对象,免得多次作为参数传入函数中 */ +static GlobalArg &g_gArg = GlobalArg::Instance(); +static MarkDupsArg g_mdArg; + +/* 清除key位置的数据 */ +static inline void clearIdxAtPos(int64_t key, map> *pmsIdx) +{ + auto &msIdx = *pmsIdx; + if (msIdx.find(key) != msIdx.end()) + msIdx[key].clear(); // 清除该位点的冗余结果 +} + +/* 删除key位置的数据 */ +static inline void delIdxAtPos(int64_t key, map> *pmsIdx) +{ + auto &msIdx = *pmsIdx; + if (msIdx.find(key) != msIdx.end()) + msIdx.erase(key); +} + +/* + * 计算read的分数 + */ +static int16_t computeDuplicateScore(BamWrap &bw) +{ + int16_t score = 0; + switch (g_mdArg.DUPLICATE_SCORING_STRATEGY) + { + case ns_md::SUM_OF_BASE_QUALITIES: + // two (very) long reads worth of high-quality bases can go over Short.MAX_VALUE/2 + // and risk overflow. + score += (int16_t)min(bw.GetSumOfBaseQualities(), INT16_MAX / 2); + break; + case ns_md::TOTAL_MAPPED_REFERENCE_LENGTH: + if (!bw.GetReadUnmappedFlag()) + // no need to remember the score since this scoring mechanism is symmetric + score = (int16_t)min(bw.GetReferenceLength(), INT16_MAX / 2); + break; + case ns_md::RANDOM: + // The RANDOM score gives the same score to both reads so that they get filtered together. + // it's not critical do use the readName since the scores from both ends get added, but it seem + // to be clearer this way. + score += (short)(Murmur3::Instance().HashUnencodedChars(bw.query_name()) & 0b11111111111111); + // subtract Short.MIN_VALUE/4 from it to end up with a number between + // 0 and Short.MAX_VALUE/2. This number can be then discounted in case the read is + // not passing filters. We need to stay far from overflow so that when we add the two + // scores from the two read mates we do not overflow since that could cause us to chose a + // failing read-pair instead of a passing one. + score -= INT16_MIN / 4; + default: + break; + } + // make sure that filter-failing records are heavily discounted. (the discount can happen twice, once + // for each mate, so need to make sure we do not subtract more than Short.MIN_VALUE overall.) + score += bw.GetReadFailsVendorQualityCheckFlag() ? (int16_t)(INT16_MIN / 2) : 0; + + return score; +} + +/* + * Builds a read ends object that represents a single read. 用来表示一个read的特征结构 + */ +static void buildReadEnds(BamWrap &bw, int64_t index, ReadNameParser &rnParser, ReadEnds *pKey) +{ + auto &k = *pKey; + auto &bc = bw.b->core; + k.read1FirstOfPair = bw.GetFirstOfPairFlag(); + k.read1ReferenceIndex = bc.tid; + k.read1Coordinate = (bc.flag & BAM_FREVERSE) ? bw.GetUnclippedEnd() : bw.GetUnclippedStart(); + k.orientation = (bc.flag & BAM_FREVERSE) ? ReadEnds::R : ReadEnds::F; + + k.read1IndexInFile = index; + k.score = computeDuplicateScore(bw); + // Doing this lets the ends object know that it's part of a pair + if (bw.GetReadPairedFlag() && !bw.GetMateUnmappedFlag()) + { + k.read2ReferenceIndex = bc.mtid; + } + // Fill in the location information for optical duplicates + rnParser.AddLocationInformation(bw.query_name(), pKey); + // cout << k.tile << ' ' << k.x << ' ' << k.y << endl; + // 计算位置key + k.posKey = BamWrap::bam_global_pos(k.read1ReferenceIndex, k.read1Coordinate); // << 1 | k.orientation; +} + +/** + * Takes a list of ReadEndsForMarkDuplicates objects and identify the representative read based on + * quality score. For all members of the duplicate set, add the read1 index-in-file of the representative + * read to the records of the first and second in a pair. This value becomes is used for + * the 'DI' tag. + */ +static void addRepresentativeReadIndex(vector &vpRe) +{ +} + +/* 处理一组pairend的readends,标记冗余 */ +static void markDuplicatePairs(int64_t posKey, + vector &vpRe, + map> *pmsDupIdx, + map> *pmsOpticalDupIdx) +{ + if (vpRe.size() < 2) + { + if (vpRe.size() == 1) + { + // addSingletonToCount(libraryIdGenerator); + } + return; + } + // cout << "pos:" << posKey + 1 << ";size:" << vpRe.size() << endl; + auto &sDupIdx = (*pmsDupIdx)[posKey]; + auto &sOpticalDupIdx = (*pmsOpticalDupIdx)[posKey]; + int maxScore = 0; + const ReadEnds *pBest = nullptr; + /** All read ends should have orientation FF, FR, RF, or RR **/ + for (auto pe : vpRe) // 找分数最高的readend + { + if (pe->score > maxScore || pBest == nullptr) + { + maxScore = pe->score; + pBest = pe; + } + } + if (!g_mdArg.READ_NAME_REGEX.empty()) // 检查光学冗余 + { + // trackOpticalDuplicates + } + for (auto pe : vpRe) // 对非best read标记冗余 + { + if (pe != pBest) // 非best + { + sDupIdx.insert(pe->read1IndexInFile); // 添加read1 + if (pe->read2IndexInFile != pe->read1IndexInFile) + sDupIdx.insert(pe->read2IndexInFile); // 添加read2 + } + } + + if (g_mdArg.TAG_DUPLICATE_SET_MEMBERS) + { + addRepresentativeReadIndex(vpRe); + } +} + +/* 处理一组非paired的readends,标记冗余 */ +static void markDuplicateFragments(int64_t posKey, + vector &vpRe, + bool containsPairs, + map> *pmsDupIdx) +{ + auto &sDupIdx = (*pmsDupIdx)[posKey]; + + if (containsPairs) + { + for (auto pe : vpRe) + { + if (!pe->IsPaired()) + { + sDupIdx.insert(pe->read1IndexInFile); + } + } + } + else + { + int maxScore = 0; + const ReadEnds *pBest = nullptr; + for (auto pe : vpRe) + { + if (pe->score > maxScore || pBest == nullptr) + { + maxScore = pe->score; + pBest = pe; + } + } + + for (auto pe : vpRe) + { + if (pe != pBest) + { + sDupIdx.insert(pe->read1IndexInFile); + } + } + } +} + +/* 处理位于某个坐标的pairend reads */ +static inline void handlePairs(int64_t posKey, + set &sReadEnds, + vector &vpCache, + map> *pmsDupIdx, + map> *pmsOpticalDupIdx) +{ + if (sReadEnds.size() > 1) // 有潜在的冗余 + { + vpCache.clear(); + const ReadEnds *pReadEnd = nullptr; + for (auto &re : sReadEnds) + { + if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样 + vpCache.push_back(&re); // 处理一个潜在的冗余组 + else + { + markDuplicatePairs(posKey, vpCache, pmsDupIdx, pmsOpticalDupIdx); // 不一样 + vpCache.clear(); + vpCache.push_back(&re); + pReadEnd = &re; + } + } + markDuplicatePairs(posKey, vpCache, pmsDupIdx, pmsOpticalDupIdx); + } +} + +/* 处理位于某个坐标的 reads */ +static inline void handleFrags( + int64_t posKey, + set &sReadEnds, + vector &vpCache, + map> *pmsDupIdx) +{ + if (sReadEnds.size() > 1) // 有潜在的冗余 + { + vpCache.clear(); + const ReadEnds *pReadEnd = nullptr; + bool containsPairs = false; + bool containsFrags = false; + for (auto &re : sReadEnds) + { + if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, false)) + { + vpCache.push_back(&re); + containsPairs = containsPairs || re.IsPaired(); + containsFrags = containsFrags || !re.IsPaired(); + } + else + { + if (vpCache.size() > 1 && containsFrags) + { + markDuplicateFragments(posKey, vpCache, containsPairs, pmsDupIdx); + } + vpCache.clear(); + vpCache.push_back(&re); + pReadEnd = &re; + containsPairs = re.IsPaired(); + containsFrags = !re.IsPaired(); + } + } + if (vpCache.size() > 1 && containsFrags) + { + markDuplicateFragments(posKey, vpCache, containsPairs, pmsDupIdx); + } + } +} + +/* 对找到的pairend read end添加一些信息 */ +static inline void modifyPairedEnds(ReadEnds &fragEnd, ReadEnds *pPairedEnds) +{ + auto &pairedEnds = *pPairedEnds; + int64_t bamIdx = fragEnd.read1IndexInFile; + const int matesRefIndex = fragEnd.read1ReferenceIndex; + const int matesCoordinate = fragEnd.read1Coordinate; + // Set orientationForOpticalDuplicates, which always goes by the first then the second end for the strands. NB: must do this + // before updating the orientation later. + if (fragEnd.read1FirstOfPair) + { + pairedEnds.orientationForOpticalDuplicates = + ReadEnds::GetOrientationByte(fragEnd.IsNegativeStrand(), pairedEnds.orientation == ReadEnds::R); + } + else + { + pairedEnds.orientationForOpticalDuplicates = + ReadEnds::GetOrientationByte(pairedEnds.orientation == ReadEnds::R, fragEnd.IsNegativeStrand()); + } + // If the other read is actually later, simply add the other read's data as read2, else flip the reads + if (matesRefIndex > pairedEnds.read1ReferenceIndex || + (matesRefIndex == pairedEnds.read1ReferenceIndex && matesCoordinate >= pairedEnds.read1Coordinate)) + { + pairedEnds.read2ReferenceIndex = matesRefIndex; + pairedEnds.read2Coordinate = matesCoordinate; + pairedEnds.read2IndexInFile = bamIdx; + pairedEnds.orientation = ReadEnds::GetOrientationByte(pairedEnds.orientation == ReadEnds::R, + fragEnd.IsNegativeStrand()); + + // if the two read ends are in the same position, pointing in opposite directions, + // the orientation is undefined and the procedure above + // will depend on the order of the reads in the file. + // To avoid this, we set it explicitly (to FR): + if (pairedEnds.read2ReferenceIndex == pairedEnds.read1ReferenceIndex && + pairedEnds.read2Coordinate == pairedEnds.read1Coordinate && + pairedEnds.orientation == ReadEnds::RF) + { + pairedEnds.orientation = ReadEnds::FR; + } + } + else + { + pairedEnds.read2ReferenceIndex = pairedEnds.read1ReferenceIndex; + pairedEnds.read2Coordinate = pairedEnds.read1Coordinate; + pairedEnds.read2IndexInFile = pairedEnds.read1IndexInFile; + pairedEnds.read1ReferenceIndex = matesRefIndex; + pairedEnds.read1Coordinate = matesCoordinate; + pairedEnds.read1IndexInFile = bamIdx; + pairedEnds.orientation = ReadEnds::GetOrientationByte(fragEnd.IsNegativeStrand(), + pairedEnds.orientation == ReadEnds::R); + pairedEnds.posKey = fragEnd.posKey; + } + pairedEnds.score += fragEnd.score; +} \ No newline at end of file diff --git a/src/sam/markdups/parallel_md.h b/src/sam/markdups/parallel_md.h new file mode 100644 index 0000000..3dd77ca --- /dev/null +++ b/src/sam/markdups/parallel_md.h @@ -0,0 +1,540 @@ + + +/* 多线程处理冗余参数结构体 */ +struct ThMarkDupArg +{ + int64_t bamStartIdx; // 当前vBam数组中第一个bam记录在整体bam中所处的位置 + long seq; // 当前任务在所有任务的排序 + bool more; // 后面还有任务 + volatile bool finish; // 当前任务有没有处理完 + vector vBam; // 存放待处理的bam read + map> mvPair; // 以冗余位置为索引,保存所有pairend reads + map> mvFrag; // 保存所有reads,包括pairend + map> msPairDupIdx; // pair的冗余read的索引 + map> msPairOpticalDupIdx; // optical冗余read的索引 + map> msFragDupIdx; // frag的冗余read的索引 + map> msFragOpticalDupIdx; // 这个好像没用 + unordered_map umReadEnds; // 用来寻找pair end +}; + +/* + * 多线程查找和标记冗余函数 + */ +void thread_markdups(void *arg, int tid) +{ + auto &p = *(ThMarkDupArg *)arg; + + /* 处理每个read,创建ReadEnd,并放入frag和pair中 */ + for (int i = 0; i < p.vBam.size(); ++i) // 循环处理每个read + { + BamWrap *bw = p.vBam[i]; + const int64_t bamIdx = p.bamStartIdx + i; + if (bw->GetReadUnmappedFlag()) + { + if (bw->b->core.tid == -1) + // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). + break; + } + else if (!bw->IsSecondaryOrSupplementary()) // 是主要比对 + { + ReadEnds fragEnd; + buildReadEnds(*bw, bamIdx, g_vRnParser[tid], &fragEnd); + + // if (fragEnd.read1Coordinate == 69662) + // { + // cout << fragEnd.read1Coordinate << '\t' << bw->GetUnclippedEnd() << '\t' + // << bw->GetUnclippedStart() << '\t' << bw->query_name() << '\t' + // << bw->cigar_str() << '\t' << bw->b->core.pos << '\t' << + // bw->b->core.mpos << endl; + // } + + p.mvFrag[fragEnd.posKey].push_back(fragEnd); // 添加进frag集合 + if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) // 是pairend而且互补的read也比对上了 + { + // if (bw->b->core.pos == 69813 || bw->b->core.pos == 69884) + // { + // cout << fragEnd.read1Coordinate << '\t' << bw->query_name() << endl; + // } + + string key = bw->query_name(); + if (p.umReadEnds.find(key) == p.umReadEnds.end()) + { + p.umReadEnds[key] = fragEnd; + } + else // 找到了pairend + { + auto &pairedEnds = p.umReadEnds.at(key); + modifyPairedEnds(fragEnd, &pairedEnds); + p.mvPair[pairedEnds.posKey].push_back(pairedEnds); + p.umReadEnds.erase(key); // 删除找到的pairend + // if (pairedEnds.read1Coordinate == 69662) + //{ + // cout << pairedEnds.posKey << endl; + // cout << pairedEnds.read1Coordinate << '\t' + // << pairedEnds.read2Coordinate << '\t' + // << (int)pairedEnds.orientation << endl; + // //<< fragEnd.read1Coordinate << '\t' + // //<< fragEnd.posKey << '\t' + // //<< bw->b->core.tid << '\t' << bw->b->core.pos << '\t' + // //<< bw->GetUnclippedEnd() << '\t' << bw->GetUnclippedStart() << endl; + // } + } + } + } + } + /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ + unordered_set usUnpairedPos; // 该位置有还未找到pair的read + for (auto &ele : p.umReadEnds) + { + usUnpairedPos.insert(ele.second.posKey); + } + // 先处理 pair + int dupNum = 0; + vector vRePotentialDup; // 有可能是冗余的reads + for (auto &e : p.mvPair) // 按比对的位置先后进行遍历 + { + // 如果这个位置里所有的read,没有缺少pair的,那么就处理,否则跳过 + // if (usUnpairedPos.find(e.first) == usUnpairedPos.end()) + // handlePairs(e.first, e.second, vRePotentialDup, &p.msPairDupIdx, &p.msPairOpticalDupIdx); + } + // 再处理frag + for (auto &e : p.mvFrag) + { + // handleFrags(e.first, e.second, vRePotentialDup, &p.msFragDupIdx, &p.msFragOpticalDupIdx); + } + + // cout << tid << '\t' << "dup: " << dupNum << endl; + // cout << tid << " all: no: " << p.vBam.size() << '\t' << p.umReadEnds.size() << endl; + /* 本段数据处理完成,告诉输出线程 */ + if (!g_serialProcess) + { + POSSESS(g_queueFirstLock); + p.finish = true; + // cout << tid << ": process: " << p.seq << endl; + auto front = g_qpThMarkDupArg.front(); + if (g_qpThMarkDupArg.size() > 0) // 表明是并行处理 + { + if (front->finish) + { + TWIST(g_queueFirstLock, TO, front->seq); // 通知写线程,当前队列头部完成的任务 + } + else + { + RELEASE(g_queueFirstLock); + } + } + } +} + +/* + * 多线程将结果写入文件,写之前需要合并相邻线程的未处理的结果 + * 1. pre和cur,重叠的部分合并进pre,删除cur中保存的对应的数据和结果 + * 2. 保留全局没有匹配上的pairend数据,从pre结果中删除 + * 3. 每次循环,处理一次mvUnpaired数据,如果对应的位置没有未匹配的,则计算结果,然后删除掉该位置 + */ +void thread_merge(void *) +{ + bool more = false; // 是否还有下一个任务 + long seq = 0; // 任务序列号 + long unPairedNum = 0; // 没有找到pair的总数量 + long dupReadNum = 0; // 冗余read总数量 + POSSESS(g_queueFirstLock); + WAIT_FOR(g_queueFirstLock, TO_BE, seq++); // 等待首个任务完成 + auto lastP = g_qpThMarkDupArg.front(); // 取队首的数据 + + // unordered_map umUnpairedReadEnds; // 还未找到pair的read + map> mvUnPaired; // 这些坐标对应的reads里,有还没找到pair的 + unordered_set usUnpairedPos; // 上一轮中不需要计算的点(unpaired) + unordered_map> mvPair; // 上一轮中遗留的需要重新计算的pair(包括重叠和没paired) + unordered_map> mvFrag; // 上一轮中需要重新计算的frag(与这一轮位置有重叠的) + vector vRePotentialDup; // 有可能是冗余的reads + map> msPairDupIdx; // pair的冗余read的索引 + map> msPairOpticalDupIdx; // optical冗余read的索引 + auto umUnpairedReadEnds = lastP->umReadEnds; + auto p = lastP; + g_qpThMarkDupArg.pop(); // 删除队首 + TWIST(g_queueFirstLock, TO, seq); // 解锁 + more = lastP->more; // 是否还有下一个任务 + while (more) // 循环处理,将结果写入文件 + { + POSSESS(g_queueFirstLock); + if (g_qpThMarkDupArg.empty()) // 有可能新任务没来得及添加进队列 + { + RELEASE(g_queueFirstLock); + continue; + } + WAIT_FOR(g_queueFirstLock, TO_BE, seq); // 等待任务完成 + p = g_qpThMarkDupArg.front(); + if (!p->finish) // 有可能这个任务没有完成,是下边那个TWIST导致进到这里,因为这一段代码可能运行比较快 + { + TWIST(g_queueFirstLock, TO, -1); // 此时队首任务没完成,-1可以让锁无法进入到这里,避免无效获得锁 + continue; + } + g_qpThMarkDupArg.pop(); + TWIST(g_queueFirstLock, TO, seq + 1); + /* 处理结果数据 */ + // cout << "finish: " << seq - 1 << '\t' << "lastIdx: " << p->bamStartIdx+p->vBam.size() << endl; + // 找paired中重叠的部分 + mvPair.clear(); + mvFrag.clear(); + usUnpairedPos.clear(); + + int64_t lastPairPos = lastP->mvPair.rbegin()->first; // 上一轮read最后到达的坐标 + for (auto &pair : p->mvPair) + { + const int64_t pos = pair.first; + if (pos > lastPairPos) // 超过了上一个任务最大的位点坐标,那么就不再继续检查了 + break; + mvPair.insert(pair); // 保存此轮任务当前位点的数据 + clearIdxAtPos(pos, &p->msPairDupIdx); // 清除该位点的冗余结果 + clearIdxAtPos(pos, &p->msPairOpticalDupIdx); + if (lastP->mvPair.find(pos) != lastP->mvPair.end()) // 上一个任务同一个位点也有数据 + { + mvPair[pos].insert(mvPair[pos].end(), lastP->mvPair[pos].begin(), lastP->mvPair[pos].end()); + clearIdxAtPos(pos, &lastP->msPairDupIdx); // 清除该位点的冗余结果 + clearIdxAtPos(pos, &lastP->msPairOpticalDupIdx); + } + } + // 找上一轮中没配对的pair + for (auto itr = p->umReadEnds.begin(); itr != p->umReadEnds.end();) // 在当前任务中找有没有与上一个任务中没匹配的read,相匹配的pair + { + auto key = itr->second.posKey; + auto &fragEnd = itr->second; + if (lastP->umReadEnds.find(itr->first) != lastP->umReadEnds.end()) + { + auto &pairedEnds = lastP->umReadEnds.at(itr->first); + modifyPairedEnds(fragEnd, &pairedEnds); + mvPair[key].push_back(pairedEnds); + lastP->umReadEnds.erase(itr->first); // 删除 + if (umUnpairedReadEnds.find(itr->first) != umUnpairedReadEnds.end()) + umUnpairedReadEnds.erase(itr->first); + itr = p->umReadEnds.erase(itr); // 删除本轮中对应的unpaired read + } + else + { + if (umUnpairedReadEnds.find(itr->first) != umUnpairedReadEnds.end()) // 前边一直没找到paired的数据,在这一轮找到了 + { + auto &pairedEnds = umUnpairedReadEnds.at(itr->first); + modifyPairedEnds(fragEnd, &pairedEnds); + mvUnPaired[key].push_back(pairedEnds); + umUnpairedReadEnds.erase(itr->first); // 删除 + itr = p->umReadEnds.erase(itr); // 删除本轮中对应的unpaired read + } + else // 新的没配对的 + { + umUnpairedReadEnds.insert(*itr); // 没有pair,则添加 + auto &v = p->mvPair[key]; + mvUnPaired[key].insert(mvUnPaired[key].end(), v.begin(), v.end()); + ++itr; + } + } + } + // 计算上一轮还需要计算的pair + for (auto &e : lastP->umReadEnds) + usUnpairedPos.insert(e.second.posKey); + for (auto &e : mvPair) + { + //if (usUnpairedPos.find(e.first) == usUnpairedPos.end()) + // handlePairs(e.first, e.second, vRePotentialDup, &lastP->msPairDupIdx, &lastP->msPairOpticalDupIdx); + } + + // 计算多轮之后遗留的pair + usUnpairedPos.clear(); + for (auto &e : umUnpairedReadEnds) + usUnpairedPos.insert(e.second.posKey); + for (auto itr = mvUnPaired.begin(); itr != mvUnPaired.end();) + { + if (usUnpairedPos.find(itr->first) == usUnpairedPos.end()) + { + // handlePairs(itr->first, itr->second, vRePotentialDup, &msPairDupIdx, &msPairOpticalDupIdx); + // itr = mvUnPaired.erase(itr); + } + else + { + ++itr; + } + } + + // 找上一轮重叠的frag + int64_t lastFragPos = lastP->mvFrag.rbegin()->first; + for (auto &pair : p->mvFrag) + { + const int64_t pos = pair.first; + if (pos > lastPairPos) // 超过了上一个任务最大的位点坐标,那么就不再继续检查了 + break; + mvFrag.insert(pair); // 保存此轮任务当前位点的数据 + clearIdxAtPos(pos, &p->msFragDupIdx); // 清除该位点的冗余结果 + if (lastP->mvFrag.find(pos) != lastP->mvFrag.end()) // 上一个任务同一个位点也有数据 + { + mvFrag[pos].insert(mvFrag[pos].end(), lastP->mvFrag[pos].begin(), lastP->mvFrag[pos].end()); + clearIdxAtPos(pos, &lastP->msFragDupIdx); // 上一个任务该位点有冗余结果 + } + } + for (auto &e : mvFrag) + { + // handleFrags(e.first, e.second, vRePotentialDup, &lastP->msFragDupIdx, &lastP->msFragOpticalDupIdx); + } + + // 计算冗余数量 + for (auto &ele : lastP->msPairDupIdx) + dupReadNum += ele.second.size(); + for (auto &ele : lastP->msFragDupIdx) + dupReadNum += ele.second.size(); + + /* 更新处理完的read数量和状态 */ + POSSESS(g_readyToReadLock); + g_bamUsedNum += lastP->vBam.size(); + // cout << "write: " << g_qpThMarkDupArg.size() << endl; + if (g_qpThMarkDupArg.size() <= g_jobNumForRead) + { + TWIST(g_readyToReadLock, TO, 1); + } + else + { + RELEASE(g_readyToReadLock); + } + /* 准备下一轮循环 */ + delete lastP; + more = p->more; + lastP = p; + seq++; + // cout << "unpaired: " << lastP->umReadEnds.size() << endl; + } + + unPairedNum = umUnpairedReadEnds.size(); + // 计算冗余数量 + for (auto &ele : lastP->msPairDupIdx) + dupReadNum += ele.second.size(); + for (auto &ele : lastP->msFragDupIdx) + dupReadNum += ele.second.size(); + + // 多轮遗留的 + for (auto &ele : msPairDupIdx) + dupReadNum += ele.second.size(); + + // cout << "Finally unpaired read num: " << unPairedNum << endl; + // cout << "Finally dulicate read num: " << dupReadNum << endl; + // cout << "last unpaired num: " << lastP->umReadEnds.size() << endl; + int unpairedReads = 0; + for (auto &e : mvUnPaired) + unpairedReads += e.second.size(); + // cout << "mvUnpaired size: " << mvUnPaired.size() << endl; + // cout << "mvUnpaired vector size: " << unpairedReads << endl; + + // 最后一个数据处理完了 + POSSESS(g_readyToReadLock); + g_bamUsedNum += lastP->vBam.size(); + TWIST(g_readyToReadLock, TO, 1); + // cout << "last finish: " << seq - 1 << endl; + delete lastP; + pthread_exit(0); +} + +/* 串行运行 */ +void serialProcessData(BamBufType &inBamBuf) +{ + ThMarkDupArg p({0, + 0, + true, + true, + inBamBuf.Slice(0, inBamBuf.Size())}); + thread_markdups(&p, 0); + /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ + unordered_set usUnpairedPos; // 该位置有还未找到pair的read + for (auto &ele : p.umReadEnds) + { + usUnpairedPos.insert(ele.second.posKey); + } + // 先处理 pair + int dupNum = 0; + vector vRePotentialDup; // 有可能是冗余的reads + for (auto &e : p.mvPair) // 按比对的位置先后进行遍历 + { + // 如果这个位置里所有的read,没有缺少pair的,那么就处理,否则跳过 + // if (usUnpairedPos.find(e.first) != usUnpairedPos.end()) + // handlePairs(e.first, e.second, vRePotentialDup, &p.msPairDupIdx, &p.msPairOpticalDupIdx); + } + + // int64_t estimateDupNum = 0; + // for (auto &e : p.mvPair) + // { + // int FF = 0; + // int FR = 0; + // int RF = 0; + // int RR = 0; + // for (auto &re : e.second) + // { + // if (re.orientation == ReadEnds::FF) + // FF++; + // else if (re.orientation == ReadEnds::FR) + // FR++; + // else if (re.orientation == ReadEnds::RF) + // RF++; + // else + // RR++; + // } + // if (FF > 1) { + // estimateDupNum += (FF - 1); + // } + // if (FR > 1) + // { + // estimateDupNum += (FR - 1); + // } + // if (RF > 1) + // { + // estimateDupNum += (RF - 1); + // } + // if (RR > 1) + // { + // estimateDupNum += (RR - 1); + // } + // } + + // cout << "Estimate dup num: " << estimateDupNum << endl; + + // cout << "Finally unpaired read num: " << p.umReadEnds.size() << endl; + int dupReadNum = 0; + for (auto &ele : p.msPairDupIdx) + dupReadNum += ele.second.size(); + cout << "pair dulicate read num: " << dupReadNum << endl; + for (auto &ele : p.msFragDupIdx) + dupReadNum += ele.second.size(); + cout << "Finally dulicate read num: " << dupReadNum << endl; + + // int pairNum = 0, fragNum = 0; + // for (auto &e : p.mvPair) + // pairNum += e.second.size(); + // for (auto &e : p.mvFrag) + // fragNum += e.second.size(); + // cout << "pair num: " << pairNum << endl; + // cout << "frag num: " << fragNum << endl; +} + +static void parallelMarkDups() +{ + + // /* 读取缓存初始化 */ + BamBufType inBamBuf(g_gArg.use_asyncio); + inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); + + /* 循环读入信息,并处理 */ + g_maxJobNum = g_gArg.num_threads * 10; + // g_maxJobNum = g_gArg.num_threads * 3; + g_jobNumForRead = g_gArg.num_threads * 2; + + int64_t x_all = 0; // for test + int64_t jobSeq = 0; + int64_t processedBamNum = 0; // 记录每个轮次累计处理的reads数量,用来计算每个read在整个文件中的索引位置 + threadpool thpool; + thread *mergeth; + if (g_gArg.num_threads == 1) + { + g_serialProcess = true; + } + else + { + thpool = thpool_init(g_gArg.num_threads); // 创建mark dup所需的线程池 + mergeth = LAUNCH(thread_merge, nullptr); // 启动处理结果的的线程,合并所有线程的结果 + } + const int minReadNum = 1000000; // 最小并行处理的read数量 + int bamRemainSize = 0; // 上一轮还剩下的bam数量,包含已经在任务里的和没有放进任务的 + int numReadsForEachJob = 0; // 每个线程处理的read数量,第一次读取的时候进行设置 + int lastRoundUnProcessed = 0; // 上一轮没有放进任务里的read数量 + int curRoundProcessed = 0; // 这一轮放进任务的read数量 + while (inBamBuf.ReadStat() >= 0) + { + /* 读取bam文件中的read */ + size_t readNum = inBamBuf.ReadBam(); + cout << readNum << endl; // 这一轮读取的bam数量 + + // if (readNum <= minReadNum) + // { + // serialProcess = true; + // // 调用串行运行代码 + // serialProcessData(inBamBuf); + // break; + // } + if (g_serialProcess) + { + tm_arr[0].acc_start(); + serialProcessData(inBamBuf); + tm_arr[0].acc_end(); + inBamBuf.ClearAll(); + continue; + } + + if (numReadsForEachJob == 0) + numReadsForEachJob = readNum / g_maxJobNum; // 第一次读取bam的时候进行设置 + g_bamLoadedNum += readNum; + + /* 多线程处理 任务数是线程数的10倍 */ + + curRoundProcessed = 0; // 当前轮次已经处理的reads数量 + int numNeedToProcess = inBamBuf.Size() - bamRemainSize + lastRoundUnProcessed; // 当前需要处理的bam数量 + for (int i = 0; numNeedToProcess >= numReadsForEachJob; ++i) // 只有待处理的reads数量大于一次任务的数量时,新建任务 + { + int startIdx = i * numReadsForEachJob + bamRemainSize - lastRoundUnProcessed; + int endIdx = (i + 1) * numReadsForEachJob + bamRemainSize - lastRoundUnProcessed; + + ThMarkDupArg *thArg = new ThMarkDupArg({processedBamNum + curRoundProcessed, + jobSeq++, + true, + false, + inBamBuf.Slice(startIdx, endIdx)}); + POSSESS(g_queueFirstLock); // 加锁 + g_qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列 + RELEASE(g_queueFirstLock); // 解锁 + thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务 + curRoundProcessed += endIdx - startIdx; + numNeedToProcess -= numReadsForEachJob; + } + processedBamNum += curRoundProcessed; + lastRoundUnProcessed = numNeedToProcess; + + /* 等待可以继续读取的信号 */ + POSSESS(g_readyToReadLock); + WAIT_FOR(g_readyToReadLock, TO_BE, 1); + bamRemainSize = g_bamLoadedNum - g_bamUsedNum; + + while (bamRemainSize >= inBamBuf.Size() / 2) + { // 要保留的多于现在有的bam数量的一半,那就等待write线程继续处理 + TWIST(g_readyToReadLock, TO, 0); + POSSESS(g_readyToReadLock); + WAIT_FOR(g_readyToReadLock, TO_BE, 1); + bamRemainSize = g_bamLoadedNum - g_bamUsedNum; + } + inBamBuf.ClearBeforeIdx(inBamBuf.Size() - bamRemainSize); // 清理掉已经处理完的reads + // cout << g_bamLoadedNum << '\t' << g_bamUsedNum << '\t' << bamRemainSize << '\t' << inBamBuf.Size() << endl; + TWIST(g_readyToReadLock, TO, 0); + } + if (!g_serialProcess) + { + /* 数据读完了,放一个空的任务,好让write thread停下来 */ + ThMarkDupArg *thArg = nullptr; + if (lastRoundUnProcessed > 0) // 最后一轮还有没有添加进任务的read数据 + { + thArg = new ThMarkDupArg({processedBamNum + curRoundProcessed, jobSeq++, false, false, + inBamBuf.Slice(inBamBuf.Size() - lastRoundUnProcessed, inBamBuf.Size())}); + processedBamNum += lastRoundUnProcessed; + } + else + { + thArg = new ThMarkDupArg({0, jobSeq++, false, false}); + } + POSSESS(g_queueFirstLock); // 加锁 + g_qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列 + RELEASE(g_queueFirstLock); // 解锁 + thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务 + + /* 同步所有线程 */ + thpool_wait(thpool); + thpool_destroy(thpool); + JOIN(mergeth); + } + inBamBuf.FreeMemory(); + + cout << "x_all: " << x_all << endl; + cout << "loaded: " << g_bamLoadedNum << endl; + cout << " used: " << g_bamUsedNum << endl; + cout << "processedBamNum: " << processedBamNum << endl; +} \ No newline at end of file diff --git a/src/sam/markdups/serial_md.h b/src/sam/markdups/serial_md.h new file mode 100644 index 0000000..65efc5e --- /dev/null +++ b/src/sam/markdups/serial_md.h @@ -0,0 +1,418 @@ +/* 单线程处理冗余参数结构体 */ +struct SerailMarkDupArg +{ + int64_t bamStartIdx; // 当前vBam数组中第一个bam记录在整体bam中所处的位置 + vector vBam; // 存放待处理的bam read + map> msPair; // 以冗余位置为索引,保存所有pairend reads + map> msFrag; // 保存所有reads,包括pairend + map> msPairDupIdx; // pair的冗余read的索引 + map> msPairOpticalDupIdx; // optical冗余read的索引 + map> msFragDupIdx; // frag的冗余read的索引 + unordered_map unpairedDic; // 用来寻找pair end +}; + +/* 全局保留的数据,因为有些paired数据比对到了不同的染色体,相距甚远 */ +struct GlobalDataArg +{ + map> msPair; // 以冗余位置为索引,保存所有pairend reads + map> msPairDupIdx; // pair的冗余read的索引 + map> msPairOpticalDupIdx; // optical冗余read的索引 + unordered_map unpairedDic; // 用来寻找pair end + set dupIdx; + unordered_set opticalDupIdx; + map> test; +}; + +static GlobalDataArg gData; + +/* 删除某个位点的pairend的冗余idx */ +static void rmPairIdxAtPos(const int64_t pos, SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) +{ + delIdxAtPos(pos, &curArg->msPairDupIdx); // 删除该位点的冗余结果 + delIdxAtPos(pos, &curArg->msPairOpticalDupIdx); + clearIdxAtPos(pos, &lastArg->msPairDupIdx); // 清除该位点的冗余结果 + clearIdxAtPos(pos, &lastArg->msPairOpticalDupIdx); +} + +static void clearPairIdxAtPos(const int64_t pos, SerailMarkDupArg *task) +{ + clearIdxAtPos(pos, &task->msPairDupIdx); // 删除该位点的冗余结果 + clearIdxAtPos(pos, &task->msPairOpticalDupIdx); +} + +static void clearPairIdxAtPos(const int64_t pos, + map> *dupIdx, + map> *opticalDupIdx) +{ + clearIdxAtPos(pos, dupIdx); // 删除该位点的冗余结果 + clearIdxAtPos(pos, opticalDupIdx); +} + +/* 删除某个位点的frag的冗余idx */ +static void rmFragIdxAtPos(const int64_t pos, SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg) +{ + delIdxAtPos(pos, &curArg->msFragDupIdx); // 删除该位点的冗余结果 + clearIdxAtPos(pos, &lastArg->msFragDupIdx); // 清除该位点的冗余结果 +} + +/* 单线程生成readends (第一步)*/ +static void generateReadEnds(SerailMarkDupArg *arg) +{ + auto &p = *arg; + auto &rnParser = g_vRnParser[0]; + /* 处理每个read,创建ReadEnd,并放入frag和pair中 */ + for (int i = 0; i < p.vBam.size(); ++i) // 循环处理每个read + { + BamWrap *bw = p.vBam[i]; + const int64_t bamIdx = p.bamStartIdx + i; + if (bw->GetReadUnmappedFlag()) + { + if (bw->b->core.tid == -1) + // When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort). + break; + } + else if (!bw->IsSecondaryOrSupplementary()) // 是主要比对 + { + ReadEnds fragEnd; + buildReadEnds(*bw, bamIdx, rnParser, &fragEnd); + // if (fragEnd.posKey == 3547574 || fragEnd.posKey == 3547930) + // { + // cout << fragEnd.posKey << '\t' << bw->query_name() << endl; + // } + p.msFrag[fragEnd.posKey].insert(fragEnd); // 添加进frag集合 + if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) // 是pairend而且互补的read也比对上了 + { + string key = bw->query_name(); + if (p.unpairedDic.find(key) == p.unpairedDic.end()) + { + p.unpairedDic[key] = fragEnd; + } + else // 找到了pairend + { + auto &pairedEnds = p.unpairedDic.at(key); + modifyPairedEnds(fragEnd, &pairedEnds); + p.msPair[pairedEnds.posKey].insert(pairedEnds); + p.unpairedDic.erase(key); // 删除找到的pairend + } + } + } + } +} + +/* 单线程markdup (第二步)*/ +static void markdups(SerailMarkDupArg *arg) +{ + auto &p = *arg; + /* generateDuplicateIndexes,计算冗余read在所有read中的位置索引 */ + unordered_set usUnpairedPos; // 该位置有还未找到pair的read + for (auto &ele : p.unpairedDic) + { + usUnpairedPos.insert(ele.second.posKey); + } + // 先处理 pair + vector vRePotentialDup; // 有可能是冗余的reads + for (auto &e : p.msPair) // 按比对的位置先后进行遍历 + { + handlePairs(e.first, e.second, vRePotentialDup, &p.msPairDupIdx, &p.msPairOpticalDupIdx); + } + + // 再处理frag + for (auto &e : p.msFrag) + { + handleFrags(e.first, e.second, vRePotentialDup, &p.msFragDupIdx); + } +} + +// 将pair set中的pair添加进另一个pair set +static void mergeToPairSet(int64_t pos, map> &src, set *dst) +{ + if (src.find(pos) != src.end()) + { + for (auto &pe : src[pos]) + { + dst->insert(pe); + } + // src.erase(pos); + } +} + +/* 处理相邻的两个任务,有相交叉的数据 */ +static void handleIntersectData(SerailMarkDupArg *lastArg, SerailMarkDupArg *curArg, GlobalDataArg *gDataArg) +{ + auto &lp = *lastArg; + auto &p = *curArg; + auto &g = *gDataArg; + + vector vRePotentialDup; // 有可能是冗余的reads + + int64_t lastPairPos = 0; + if (lp.msPair.size() > 0) + lastPairPos = lp.msPair.rbegin()->first; // 上一轮read最后到达的坐标 + for (auto &pair : p.msPair) // 重叠的pair + { + int64_t pos = pair.first; + if (pos > lastPairPos) // 超过了上一个任务最大的位点坐标,那么就不再继续检查了 + break; + if (lp.msPair.find(pos) != lp.msPair.end()) // 上一个任务里也有这个位点,两个任务在相同的点位上都有数据,则需要重新计算该点位 + { + auto &pairedSet = lp.msPair[pos]; + rmPairIdxAtPos(pos, &lp, &p); + for (auto &curPair : pair.second) // 改变了lp当前位点的paired set + pairedSet.insert(curPair); + handlePairs(pos, pairedSet, vRePotentialDup, &lp.msPairDupIdx, &lp.msPairOpticalDupIdx); // 把结果放在上个任务里 + } + } + for (auto &unpair : lp.unpairedDic) // 上一个任务中没有找到匹配的pair + { + auto &readName = unpair.first; + auto &unpairEnd = unpair.second; + + if (p.unpairedDic.find(readName) != p.unpairedDic.end()) // 在当前任务中找到了匹配的pairend + { + auto &fe = p.unpairedDic.at(readName); + modifyPairedEnds(fe, &unpairEnd); + int64_t pos = unpairEnd.posKey; + auto &pairedSet = lp.msPair[pos]; + rmPairIdxAtPos(pos, &lp, &p); + pairedSet.insert(unpairEnd); // 改变了lp当前位点的paired set + mergeToPairSet(pos, p.msPair, &pairedSet); // 将当前任务在该位点的数据合并进pairset + handlePairs(pos, pairedSet, vRePotentialDup, &lp.msPairDupIdx, &lp.msPairOpticalDupIdx); + p.unpairedDic.erase(readName); + } + else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) // 在全局中找 + { + auto &prePe = g.unpairedDic.at(readName); + modifyPairedEnds(unpairEnd, &prePe); + int64_t pos = prePe.posKey; + rmPairIdxAtPos(pos, &lp, &p); + clearPairIdxAtPos(pos, &g.msPairDupIdx, &g.msPairOpticalDupIdx); + auto &prePairSet = g.msPair[pos]; + prePairSet.insert(prePe); + mergeToPairSet(pos, lp.msPair, &prePairSet); + mergeToPairSet(pos, p.msPair, &prePairSet); + + // if (pos == 3547574) + // { + // cout <<"here-1: " << pos << '\t' << prePairSet.size() << '\t' << readName << '\t' + // << (p.msPair.find(pos) != p.msPair.end()) << '\t' + // << (p.unpairedDic.find(readName) != p.unpairedDic.end()) << '\t' + // << (g.unpairedDic.find(readName) != g.unpairedDic.end()) << endl; + // } + + handlePairs(pos, prePairSet, vRePotentialDup, &g.msPairDupIdx, &g.msPairOpticalDupIdx); + g.unpairedDic.erase(readName); + // g.msPair.erase(pos); + } + else // 插入全局数据中 + { + int64_t pos = unpairEnd.posKey; + rmPairIdxAtPos(pos, &lp, &p); + g.unpairedDic.insert(unpair); + mergeToPairSet(pos, lp.msPair, &g.msPair[pos]); + mergeToPairSet(pos, p.msPair, &g.msPair[pos]); + // if (pos == 3547574) + // { + // cout << "here-3: " << pos << '\t' << g.msPair[pos].size() << '\t' << readName << '\t' + // << lp.msPair[pos].size() << '\t' + // << p.msPair[pos].size() << '\t' + // << (g.unpairedDic.find(readName) != g.unpairedDic.end()) << endl; + // } + } + } + + int64_t lastFragPos = 0; + if (lp.msFrag.size() > 0) + lastFragPos = lp.msFrag.rbegin()->first; // 上一轮read最后到达的坐标, frag + for (auto &frag : p.msFrag) // 重叠的frag + { + const int64_t pos = frag.first; + if (pos > lastFragPos) + break; + if (lp.msFrag.find(pos) != lp.msFrag.end()) // 上一个任务里也有这个位点,两个任务在相同的点位上都有数据,则需要重新计算该点位 + { + auto &fragSet = lp.msFrag[pos]; + rmFragIdxAtPos(pos, &lp, &p); + for (auto &curFrag : frag.second) // 改变了lp当前位点的paired set + fragSet.insert(curFrag); + handleFrags(pos, fragSet, vRePotentialDup, &lp.msFragDupIdx); // 把结果放在上个任务里 + } + } +} + +/* 当所有任务结束后,global data里还有未处理的数据 */ +static void handleLastTask(SerailMarkDupArg *task, GlobalDataArg *gDataArg) +{ + auto &p = *task; + auto &g = *gDataArg; + + vector vRePotentialDup; + for (auto &unpair : p.unpairedDic) // 最后一个任务中没有找到匹配的pair + { + auto &readName = unpair.first; + auto &unpairEnd = unpair.second; + if (g.unpairedDic.find(readName) != g.unpairedDic.end()) // 在全局中找 + { + + auto &prePe = g.unpairedDic.at(readName); + modifyPairedEnds(unpairEnd, &prePe); + int64_t pos = prePe.posKey; + clearPairIdxAtPos(pos, &p); + auto &prePairSet = g.msPair[pos]; + prePairSet.insert(prePe); + mergeToPairSet(pos, p.msPair, &prePairSet); + handlePairs(pos, prePairSet, vRePotentialDup, &g.msPairDupIdx, &g.msPairOpticalDupIdx); + g.unpairedDic.erase(readName); + + // if (pos == 3547574) + // { + // cout << "here-2: " << pos << '\t' << prePairSet.size() << '\t' << readName << '\t' + // << (p.msPair.find(pos) != p.msPair.end()) << '\t' + // << (p.unpairedDic.find(readName) != p.unpairedDic.end()) << '\t' + // << (g.unpairedDic.find(readName) != g.unpairedDic.end()) << endl; + // } + } + else // 插入全局数据中 + { + int64_t pos = unpairEnd.posKey; + g.unpairedDic.insert(unpair); + mergeToPairSet(pos, p.msPair, &g.msPair[pos]); + // if (pos == 3547574) + // { + // cout << "here-4: " << pos << '\t' << g.msPair[pos].size() << '\t' << readName << '\t' + // << (p.msPair.find(pos) != p.msPair.end()) << '\t' + // << (p.unpairedDic.find(readName) != p.unpairedDic.end()) << '\t' + // << (g.unpairedDic.find(readName) != g.unpairedDic.end()) << endl; + // } + } + } + + // 处理剩余的 + for (auto &pe : g.msPair) + { + // if (pe.first == 3547574) + // { + // cout << "here-4: " << pe.first << '\t' << g.msPair[pe.first].size() << endl; + // } + handlePairs(pe.first, pe.second, vRePotentialDup, &g.msPairDupIdx, &g.msPairOpticalDupIdx); + } +} + +/* 功能函数,将冗余索引添加进结果中 */ +static void addIdxToSet(map> &taskDupIdx, set *resultSet) +{ + for (auto &idxSet : taskDupIdx) + { + // cout << idxSet.first << '\t' << idxSet.second.size() << endl; + for (auto idx : idxSet.second) + { + resultSet->insert(idx); + gData.test[idxSet.first].insert(idx); + } + } +} + +/* 功能函数,将冗余(包括光学冗余)索引添加进结果中 */ +static void addOpticalIdxToSet(map> &taskDupIdx, set *resultSet, unordered_set *opticalResult) +{ + for (auto &idxSet : taskDupIdx) + { + // cout << idxSet.first << '\t' << idxSet.second.size() << endl; + for (auto idx : idxSet.second) + { + resultSet->insert(idx); + opticalResult->insert(idx); + gData.test[idxSet.first].insert(idx); + } + } +} + +/* 将每轮任务得到的冗余index添加到全局结果里 */ +static void addTaskIdxToSet(SerailMarkDupArg *task, GlobalDataArg *gDataArg) +{ + auto &p = *task; + auto &g = *gDataArg; + addIdxToSet(p.msPairDupIdx, &g.dupIdx); + addOpticalIdxToSet(p.msPairOpticalDupIdx, &g.dupIdx, &g.opticalDupIdx); + addIdxToSet(p.msFragDupIdx, &g.dupIdx); +} + +/* 将所有任务结束后,剩余的冗余index添加到全局结果里 */ +static void addGlobalIdxToSet(GlobalDataArg *gDataArg) +{ + auto &g = *gDataArg; + addIdxToSet(g.msPairDupIdx, &g.dupIdx); + addOpticalIdxToSet(g.msPairOpticalDupIdx, &g.dupIdx, &g.opticalDupIdx); +} + +/* 串行处理数据,标记冗余 */ +static void serialMarkDups() +{ + tm_arr[5].acc_start(); + Timer::log_time("serial start"); + // 读取缓存初始化 + BamBufType inBamBuf(g_gArg.use_asyncio); + inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem); + // BamBufType inBamBuf(false); + //inBamBuf.Init(g_inBamFp, g_inBamHeader, 10 * 1024 * 1024); + int64_t processedBamNum = 0; + SerailMarkDupArg *lastArgP = nullptr; + SerailMarkDupArg *sMdArg = nullptr; + while (inBamBuf.ReadStat() >= 0) + { + // 读取bam文件中的read + tm_arr[4].acc_start(); + size_t readNum = inBamBuf.ReadBam(); + tm_arr[4].acc_end(); + cout << "read num: " << readNum << endl; + lastArgP = sMdArg; + tm_arr[6].acc_start(); + sMdArg = new SerailMarkDupArg({processedBamNum, + inBamBuf.GetBamArr()}); + tm_arr[6].acc_end(); + tm_arr[0].acc_start(); + generateReadEnds(sMdArg); + tm_arr[0].acc_end(); + + tm_arr[1].acc_start(); + markdups(sMdArg); + tm_arr[1].acc_end(); + + if (lastArgP != nullptr) + { + tm_arr[2].acc_start(); + handleIntersectData(lastArgP, sMdArg, &gData); + addTaskIdxToSet(lastArgP, &gData); + tm_arr[2].acc_end(); + tm_arr[7].acc_start(); + delete lastArgP; // 清除用过的数据 + tm_arr[7].acc_end(); + } + inBamBuf.ClearAll(); // 清理上一轮读入的数据 + processedBamNum += readNum; + } + tm_arr[3].acc_start(); + // 处理剩下的全局数据 + handleLastTask(sMdArg, &gData); + addTaskIdxToSet(sMdArg, &gData); + addGlobalIdxToSet(&gData); + tm_arr[3].acc_end(); + + // for (auto &e : gData.test) + // { + // cout << e.first << '\t' << e.second.size() << endl; + // } + tm_arr[5].acc_end(); + // 统计所有冗余index数量 + cout << "dup num : " << gData.dupIdx.size() << endl; + + cout << "calc readend: " << tm_arr[0].acc_seconds_elapsed() << endl; + cout << "markdup : " << tm_arr[1].acc_seconds_elapsed() << endl; + cout << "handle tail : " << tm_arr[2].acc_seconds_elapsed() << endl; + cout << "handle last : " << tm_arr[3].acc_seconds_elapsed() << endl; + cout << "read bam : " << tm_arr[4].acc_seconds_elapsed() << endl; + cout << "new arg : " << tm_arr[6].acc_seconds_elapsed() << endl; + cout << "del arg : " << tm_arr[7].acc_seconds_elapsed() << endl; + cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl; + + Timer::log_time("serial end "); +} \ No newline at end of file diff --git a/src/sam/utils/read_ends.h b/src/sam/utils/read_ends.h index 0e50b0c..da5b829 100644 --- a/src/sam/utils/read_ends.h +++ b/src/sam/utils/read_ends.h @@ -34,11 +34,13 @@ struct PhysicalLocation /* 包含了所有read ends信息,如picard里边的 ReadEndsForMarkDuplicates*/ struct ReadEnds : PhysicalLocation { + static const int8_t F = 0, R = 1, FF = 2, FR = 3, RR = 4, RF = 5; + /* 保留一些bam记录中的数据 */ + bool read1FirstOfPair = true; /* ReadEnds中的成员变量 */ /** Little struct-like class to hold read pair (and fragment) end data for duplicate marking. */ - static const int8_t F = 0, R = 1, FF = 2, FR = 3, RR = 4, RF = 5; // int16_t libraryId; // 没用,不考虑多样本 - int8_t orientation; + int8_t orientation = -1; int32_t read1ReferenceIndex = -1; int32_t read1Coordinate = -1; int32_t read2ReferenceIndex = -1; @@ -85,7 +87,7 @@ struct ReadEnds : PhysicalLocation } /* 比较两个readends是否一样(有个冗余) */ - static bool AreComparableForDuplicates(ReadEnds &lhs, ReadEnds &rhs, bool compareRead2) + static bool AreComparableForDuplicates(const ReadEnds &lhs, const ReadEnds &rhs, bool compareRead2) { bool areComparable = true; areComparable = lhs.read1ReferenceIndex == rhs.read1ReferenceIndex && @@ -100,16 +102,46 @@ struct ReadEnds : PhysicalLocation } /* 比对方向是否正向 */ - bool IsForwardStrand() + bool IsPositiveStrand() const { return orientation == F; } /* pairend是否合适的比对上了 */ - bool IsPaired() + bool IsPaired() const { return read2ReferenceIndex != -1; } + + bool IsNegativeStrand() const + { + return orientation == R; + } + + // 比较函数 + bool operator < (const ReadEnds &o) const + { + int comp = read1ReferenceIndex - o.read1ReferenceIndex; + if (comp == 0) + comp = read1Coordinate - o.read1Coordinate; + if (comp == 0) + comp = orientation - o.orientation; + if (comp == 0) + comp = read2ReferenceIndex - o.read2ReferenceIndex; + if (comp == 0) + comp = read2Coordinate - o.read2Coordinate; + if (comp == 0) + comp = tile - o.tile; + if (comp == 0) + comp = x - o.x; + if (comp == 0) + comp - y - o.y; + if (comp == 0) + comp = (int)(read1IndexInFile - o.read1IndexInFile); + if (comp == 0) + comp = (int)(read2IndexInFile - o.read2IndexInFile); + return comp < 0; + } }; #endif \ No newline at end of file