Change some annotations

This commit is contained in:
zzh 2025-04-23 14:59:40 +08:00
parent b0b9efdc66
commit ab7da8a7d7
14 changed files with 375 additions and 375 deletions

View File

@ -11,7 +11,7 @@ using std::string;
using std::vector;
/*
*
*
*/
struct DuplicationMetrics {
/**
@ -57,15 +57,15 @@ struct DuplicationMetrics {
*/
uint64_t ESTIMATED_LIBRARY_SIZE = 0;
// 其他的统计数据
//
vector<double> CoverageMult;
// 比如在该位置4个冗余那么所有4个冗余的位置数量
// 44
MDMap DuplicateCountHist;
MDMap NonOpticalDuplicateCountHist;
MDMap OpticalDuplicatesCountHist;
// 没有冗余的位置总数量 for test
// for test
MDSet<int64_t> singletonReads;
MDSet<int64_t> dupReads; // for test
MDSet<int64_t> bestReads;

View File

@ -1,6 +1,6 @@
/*
Description:
bambambam
bambambam
Copyright : All right reserved by ICT
@ -26,26 +26,26 @@ Date : 2023/10/23
namespace nsgv {
MarkDupsArg gMdArg; // 参数
std::vector<ReadNameParser> gNameParsers; // 每个线程一个read name parser
samFile *gInBamFp; // 输入的bam文件
sam_hdr_t *gInBamHeader; // 输入的bam文件头信息
samFile *gOutBamFp; // 输出文件, sam或者bam格式
sam_hdr_t *gOutBamHeader; // 输出文件的header
DuplicationMetrics gMetrics; // 统计信息
MarkDupsArg gMdArg; //
std::vector<ReadNameParser> gNameParsers; // read name parser
samFile *gInBamFp; // bam
sam_hdr_t *gInBamHeader; // bam
samFile *gOutBamFp; // , sambam
sam_hdr_t *gOutBamHeader; // header
DuplicationMetrics gMetrics; //
DupResult gDupRes;
PipelineArg gPipe(&gDupRes);
}; // namespace nsgv
// 字节缓冲区
//
struct ByteBuf {
uint8_t *buf = nullptr;
int size = 0; // 当前使用量
int capacity = 0; // 最大容量
int size = 0; //
int capacity = 0; //
};
/*
*
*
*/
static string getFileExtension(const string &filename) {
auto last_dot = filename.find_last_of('.');
@ -58,25 +58,25 @@ static string getFileExtension(const string &filename) {
// entrance of mark duplicates
int MarkDuplicates() {
PROF_START(whole_process);
/* 初始化一些参数和变量*/
/* */
nsgv::gNameParsers.resize(nsgv::gMdArg.NUM_THREADS);
for (auto &parser : nsgv::gNameParsers)
parser.SetReadNameRegex(nsgv::gMdArg.READ_NAME_REGEX); // 用来解析read name中的tilexy信息
parser.SetReadNameRegex(nsgv::gMdArg.READ_NAME_REGEX); // read nametilexy
/* 打开输入bam文件 */
/* bam */
nsgv::gInBamFp = sam_open_format(nsgv::gMdArg.INPUT_FILE.c_str(), "r", nullptr);
if (!nsgv::gInBamFp) {
spdlog::error("[{}] load sam/bam file failed.\n", __func__);
return -1;
}
hts_set_opt(nsgv::gInBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE);
nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp); // 读取header
// 获取样本名称(libraryId)
nsgv::gInBamHeader = sam_hdr_read(nsgv::gInBamFp); // header
// (libraryId)
nsgv::gMetrics.LIBRARY = sam_hdr_line_name(nsgv::gInBamHeader, "RG", 0);
/* 利用线程池对输入输出文件进行读写 */
htsThreadPool htsPoolRead = {NULL, 0}; // 多线程读取创建线程池
htsThreadPool htsPoolWrite = {NULL, 0}; // 读写用不同的线程池
/* */
htsThreadPool htsPoolRead = {NULL, 0}; //
htsThreadPool htsPoolWrite = {NULL, 0}; //
htsPoolRead.pool = hts_tpool_init(nsgv::gMdArg.NUM_THREADS);
htsPoolWrite.pool = hts_tpool_init(nsgv::gMdArg.NUM_THREADS);
if (!htsPoolRead.pool || !htsPoolWrite.pool) {
@ -86,12 +86,12 @@ int MarkDuplicates() {
}
hts_set_opt(nsgv::gInBamFp, HTS_OPT_THREAD_POOL, &htsPoolRead);
/* 冗余检查和标记 */
/* */
PROF_START(markdup_all);
PipelineMarkDups();
PROF_END(gprof[GP_markdup_all], markdup_all);
/* 标记冗余, 将处理后的结果写入文件 */
/* , */
char modeout[12] = "wb";
sam_open_mode(modeout + 1, nsgv::gMdArg.OUTPUT_FILE.c_str(), NULL);
nsgv::gOutBamFp = sam_open(nsgv::gMdArg.OUTPUT_FILE.c_str(), modeout);
@ -100,14 +100,14 @@ int MarkDuplicates() {
return -1;
}
nsgv::gOutBamHeader = sam_hdr_dup(nsgv::gInBamHeader);
// 修改输出文件的header
// header
sam_hdr_add_line(nsgv::gOutBamHeader, "PG", "ID", nsgv::gMdArg.PROGRAM_RECORD_ID.c_str(), "VN", FASTDUP_VERSION,
"CL", nsgv::gMdArg.CLI_STR.c_str(), NULL);
// 用同样的线程数量处理输出文件
//
hts_set_opt(nsgv::gOutBamFp, HTS_OPT_BLOCK_SIZE, BAM_BLOCK_SIZE);
hts_set_opt(nsgv::gOutBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite);
sam_close(nsgv::gInBamFp); // 重新打开bam文件
sam_close(nsgv::gInBamFp); // bam
nsgv::gInBamFp = sam_open_format(nsgv::gMdArg.INPUT_FILE.c_str(), "r", nullptr);
if (!nsgv::gInBamFp) {
spdlog::error("[{}] load sam/bam file failed.\n", __func__);
@ -122,8 +122,8 @@ int MarkDuplicates() {
sam_close(nsgv::gInBamFp);
return -1;
}
// 输出index文件
string indexFn = nsgv::gMdArg.OUTPUT_FILE + ".bai"; // min_shift = 0 bai格式
// index
string indexFn = nsgv::gMdArg.OUTPUT_FILE + ".bai"; // min_shift = 0 bai
if ("sam" == getFileExtension(nsgv::gMdArg.OUTPUT_FILE) || !nsgv::gMdArg.CREATE_INDEX) {
indexFn = "";
}
@ -141,7 +141,7 @@ int MarkDuplicates() {
}
}
// 读取输入文件标记冗余并输出
//
BamBufType inBuf(nsgv::gMdArg.DUPLEX_IO);
inBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gMdArg.MAX_MEM);
@ -190,13 +190,13 @@ int MarkDuplicates() {
isOpticalDup = false;
isInDuplicateSet = false;
// 删除原来的duplicate tag
// duplicate tag
if (nsgv::gMdArg.CLEAR_DT) {
uint8_t *oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_TYPE_TAG.c_str());
if (oldTagVal != NULL) bam_aux_del(bw->b, oldTagVal);
}
// 统计信息
//
if (bw->GetReadUnmappedFlag()) {
++nsgv::gMetrics.UNMAPPED_READS;
} else if (bw->IsSecondaryOrSupplementary()) {
@ -207,7 +207,7 @@ int MarkDuplicates() {
++nsgv::gMetrics.READ_PAIRS_EXAMINED; // will need to be divided by 2 at the end
}
/* 判断是否冗余 */
/* */
if (bamIdx == dupIdx) {
++realDupSize; // for test
isDup = true;
@ -217,7 +217,7 @@ int MarkDuplicates() {
duplicateSetSize = dupIdx.dupSet;
}
// 为了防止小内存运行的时候有重复的dupidx这时候duprepIdxdupSetSize可能会有不同
// dupidxduprepIdxdupSetSize
while ((dupIdx = dupIdxQue.Pop()) == bamIdx);
if (opticalIdx == bamIdx)
isOpticalDup = true;
@ -227,7 +227,7 @@ int MarkDuplicates() {
isOpticalDup = true;
}
// 添加冗余标识
//
bw->SetDuplicateReadFlag(true);
uint8_t *oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_TYPE_TAG.c_str());
@ -242,7 +242,7 @@ int MarkDuplicates() {
nsgv::gMdArg.DUPLICATE_TYPE_LIBRARY.size() + 1,
(const uint8_t *)nsgv::gMdArg.DUPLICATE_TYPE_LIBRARY.c_str());
// 计算统计信息
//
if (!bw->IsSecondaryOrSupplementary() && !bw->GetReadUnmappedFlag()) {
// Update the duplication metrics
if (!bw->GetReadPairedFlag() || bw->GetMateUnmappedFlag()) {
@ -282,7 +282,7 @@ int MarkDuplicates() {
(const uint8_t *)&duplicateSetSize);
}
}
// 每个read都要写到output只是添加标识或者删掉这些冗余record
// readoutputrecord
++bamIdx;
if (isDup && nsgv::gMdArg.REMOVE_DUPLICATES) {
continue;
@ -312,7 +312,7 @@ int MarkDuplicates() {
}
bam_destroy1(bp);
// 计算统计信息
//
nsgv::gMetrics.READ_PAIRS_EXAMINED /= 2;
nsgv::gMetrics.READ_PAIR_DUPLICATES /= 2;
for (auto &arr : nsgv::gDupRes.opticalDupIdxArr) nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES += arr.size();
@ -329,7 +329,7 @@ int MarkDuplicates() {
}
calculateRoiHistogram(nsgv::gMetrics);
// 将统计信息写到文件里
//
if (!nsgv::gMdArg.METRICS_FILE.empty()) {
ofstream ofsM(nsgv::gMdArg.METRICS_FILE);
ofsM << "## StringHeader" << endl;
@ -364,7 +364,7 @@ int MarkDuplicates() {
return -1;
}
/* 关闭文件收尾清理 */
/* */
sam_close(nsgv::gOutBamFp);
sam_close(nsgv::gInBamFp);

View File

@ -1,5 +1,5 @@
/*
Description: Markduplicate
Description: Markduplicate
Copyright : All right reserved by ICT
@ -40,7 +40,7 @@ enum ValidationStringency {
*/
enum DuplicateTaggingPolicy { DontTag, OpticalOnly, All };
/* 排序的方式 */
/* */
enum SortOrder {
unsorted,
queryname,
@ -49,14 +49,14 @@ enum SortOrder {
unknown
};
/* 计算reads分数的方式比那个read得分更高 */
/* readsread */
enum ScoringStrategy { SUM_OF_BASE_QUALITIES, TOTAL_MAPPED_REFERENCE_LENGTH, RANDOM };
/* 索引文件的格式 bai或者csi */
/* baicsi */
enum IndexFormat { BAI, CSI };
} // namespace nsmd
/* markduplicate 需要的参数*/
/* markduplicate */
struct MarkDupsArg {
string INPUT_FILE; // input bam filename
@ -64,9 +64,9 @@ struct MarkDupsArg {
int NUM_THREADS = 1;
size_t MAX_MEM = ((size_t)1) << 30; // // 最小1G
size_t MAX_MEM = ((size_t)1) << 30; // // 1G
bool DUPLEX_IO = true; // 同时读写
bool DUPLEX_IO = true; //
/**
* The optional attribute in SAM/BAM/CRAM files used to store the duplicate type.
@ -156,7 +156,7 @@ struct MarkDupsArg {
optional = true) */
string MOLECULAR_IDENTIFIER_TAG = "";
/* 继承自 AbstractMarkDuplicatesCommandLineProgram 的参数*/
/* AbstractMarkDuplicatesCommandLineProgram */
/* "File to write duplication metrics to" */
string METRICS_FILE;
@ -196,7 +196,7 @@ struct MarkDupsArg {
optional = true */
vector<string> COMMENT;
/* 继承自 AbstractOpticalDuplicateFinderCommandLineProgram 的参数 */
/* AbstractOpticalDuplicateFinderCommandLineProgram */
/* "MarkDuplicates can use the tile and cluster positions to estimate the rate of optical duplication " +
"in addition to the dominant source of duplication, PCR, to provide a more accurate estimation of library
@ -227,7 +227,7 @@ struct MarkDupsArg {
completely disable this check, " + "set the value to -1." */
long MAX_OPTICAL_DUPLICATE_SET_SIZE = DEFAULT_MAX_DUPLICATE_SET_SIZE;
/* 继承自 CommandLineProgram 的参数*/
/* CommandLineProgram */
/* "Whether to suppress job-summary info on System.err.", common = true */
bool QUIET = false;
@ -256,9 +256,9 @@ struct MarkDupsArg {
/* Add PG tag to each read in a SAM or BAM (PGTagArgumentCollection)*/
bool ADD_PG_TAG_TO_READS = true;
// 命令行字符串
//
string CLI_STR;
// 开始运行时间
//
string START_TIME;
};

View File

@ -30,7 +30,7 @@ extern DuplicationMetrics gMetrics;
};
/*
* read
* read
*/
int16_t computeDuplicateScore(BamWrap &bw) {
int16_t score = 0;
@ -71,7 +71,7 @@ int16_t computeDuplicateScore(BamWrap &bw) {
/*
* Builds a read ends object that represents a single read.
* read
* read
*/
void buildReadEnds(BamWrap &bw, int64_t index, ReadNameParser &rnParser, ReadEnds *pKey) {
auto &k = *pKey;
@ -93,11 +93,11 @@ void buildReadEnds(BamWrap &bw, int64_t index, ReadNameParser &rnParser, ReadEnd
else
nsgv::gMdArg.CHECK_OPTICAL_DUP = false;
// cout << k.tile << ' ' << k.x << ' ' << k.y << endl;
// 计算位置key
// key
k.posKey = BamWrap::bam_global_pos(k.read1ReferenceIndex, k.read1Coordinate); // << 1 | k.orientation;
}
/* 对找到的pairend read end添加一些信息 */
/* pairend read end */
void modifyPairedEnds(const ReadEnds &fragEnd, ReadEnds *pPairedEnds) {
auto &pairedEnds = *pPairedEnds;
@ -202,7 +202,7 @@ static void findOpticalDuplicates(vector<const ReadEnds *> &readEndsArr, const R
if (currentLoc == pBestRe)
keeperIndex = i;
if (currentLoc->tile != ReadEnds::NO_VALUE) {
int key = currentLoc->tile; // 只处理一个样本所以只有一个read group
int key = currentLoc->tile; // read group
tileRGmap[key].push_back(i);
}
opticalDistanceRelationGraph.addNode(i);
@ -323,7 +323,7 @@ static int checkOpticalDuplicates(vector<const ReadEnds *> &readEndsArr, const R
}
/**
*
*
*/
void trackOpticalDuplicates(vector<const ReadEnds *> &readEndsArr, const ReadEnds *pBestRe) {
bool hasFR = false, hasRF = false;
@ -358,7 +358,7 @@ void trackOpticalDuplicates(vector<const ReadEnds *> &readEndsArr, const ReadEnd
nOpticalDup = checkOpticalDuplicates(readEndsArr, pBestRe);
}
// 统计信息trackDuplicateCounts
// trackDuplicateCounts
++nsgv::gMetrics.DuplicateCountHist[readEndsArr.size()];
if (readEndsArr.size() > nOpticalDup)
++nsgv::gMetrics.NonOpticalDuplicateCountHist[readEndsArr.size() - nOpticalDup];

View File

@ -14,19 +14,19 @@ using std::unordered_map;
using std::unordered_set;
using std::vector;
/* 前向声明 */
/* */
class BamWrap;
class ReadEnds;
class ReadNameParser;
/*
* optical duplicationgraph
* optical duplicationgraph
*/
template <class Node>
struct Graph { // set
vector<Node> nodes; // 图中的结点
struct Graph { // set
vector<Node> nodes; //
unordered_map<Node, int> nodeIdxMap;
unordered_map<int, unordered_set<int>> neighbors; // 邻接列表
unordered_map<int, unordered_set<int>> neighbors; //
int addNode(const Node &singleton) {
int idx = -1;
@ -98,18 +98,18 @@ struct Graph { // 用set
};
/*
* read
* read
*/
int16_t computeDuplicateScore(BamWrap &bw);
/*
* Builds a read ends object that represents a single read.
* read
* read
*/
void buildReadEnds(BamWrap &bw, int64_t index, ReadNameParser &rnParser, ReadEnds *pKey);
/*
* pairend read end
* pairend read end
*/
void modifyPairedEnds(const ReadEnds &fragEnd, ReadEnds *pPairedEnds);
@ -118,7 +118,7 @@ void modifyPairedEnds(const ReadEnds &fragEnd, ReadEnds *pPairedEnds);
* in fact optical duplicates, and stores the data in the instance level histogram.
* Additionally sets the transient isOpticalDuplicate flag on each read end that is
* identified as an optical duplicate.
*
*
*/
void trackOpticalDuplicates(vector<const ReadEnds *> &readEndsArr, const ReadEnds *pBestRe);

View File

@ -22,16 +22,16 @@ using namespace std;
namespace nsgv {
extern MarkDupsArg gMdArg; // 用来测试性能
extern samFile *gInBamFp; // 输入的bam文件
extern sam_hdr_t *gInBamHeader; // 输入的bam文件头信息
extern DuplicationMetrics gMetrics; // 统计信息
extern MarkDupsArg gMdArg; //
extern samFile *gInBamFp; // bam
extern sam_hdr_t *gInBamHeader; // bam
extern DuplicationMetrics gMetrics; //
extern vector<ReadNameParser> gNameParsers;
extern DupResult gDupRes;
extern PipelineArg gPipe;
}; // namespace nsgv
/* 处理一组pairendreadends标记冗余, 这个函数需要串行运行因为需要做一些统计*/
/* pairendreadends, */
static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dupIdx, MDSet<int64_t> *opticalDupIdx,
DPSet<DupInfo> *repIdx, MDSet<int64_t> *notDupIdx = nullptr,
MDSet<int64_t> *notOpticalDupIdx = nullptr, MDSet<int64_t> *notRepIdx = nullptr) {
@ -41,7 +41,7 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dup
int maxScore = 0;
const ReadEnds *pBest = nullptr;
/** All read ends should have orientation FF, FR, RF, or RR **/
for (auto pe : vpRe) { // 找分数最高的readend
for (auto pe : vpRe) { // readend
if (pe->score > maxScore || pBest == nullptr) {
maxScore = pe->score;
pBest = pe;
@ -53,7 +53,7 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dup
notDupIdx->insert(pBest->read2IndexInFile);
}
if (nsgv::gMdArg.CHECK_OPTICAL_DUP) { // 检查光学冗余
if (nsgv::gMdArg.CHECK_OPTICAL_DUP) { //
// trackOpticalDuplicates
vector<const ReadEnds *> prevOpticalRe;
if (notOpticalDupIdx != nullptr) {
@ -64,7 +64,7 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dup
}
}
trackOpticalDuplicates(vpRe, pBest);
// 由于重叠问题可能会更新数据
//
if (notOpticalDupIdx != nullptr) {
for (auto pe : prevOpticalRe) {
if (!pe->isOpticalDuplicate) {
@ -74,13 +74,13 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dup
}
}
}
for (auto pe : vpRe) { // 对非best read标记冗余
if (pe != pBest) { // best
dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // 添加read1
for (auto pe : vpRe) { // best read
if (pe != pBest) { // best
dupIdx->insert(DupInfo(pe->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read1
if (pe->read2IndexInFile != pe->read1IndexInFile)
dupIdx->insert(DupInfo(pe->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size())); // read2,
// 注意这里代表是read1的索引
// 检查是否optical dup
// read1
// optical dup
if (pe->isOpticalDuplicate && opticalDupIdx != nullptr) {
opticalDupIdx->insert(pe->read1IndexInFile);
if (pe->read2IndexInFile != pe->read1IndexInFile)
@ -88,7 +88,7 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dup
}
}
}
// 在输出的bam文件中添加tag, best作为dupset的代表
// bamtag, bestdupset
if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS) {
repIdx->insert(DupInfo(pBest->read1IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size()));
repIdx->insert(DupInfo(pBest->read2IndexInFile, pBest->read1IndexInFile, (int16_t)vpRe.size()));
@ -104,7 +104,7 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dup
}
}
/* 处理一组非pairedreadends标记冗余 */
/* pairedreadends */
static void markDupsForFrags(vector<const ReadEnds *> &vpRe, bool containsPairs, DPSet<DupInfo> *dupIdx,
MDSet<int64_t> *notDupIdx = nullptr) {
if (containsPairs) {
@ -133,28 +133,28 @@ static void markDupsForFrags(vector<const ReadEnds *> &vpRe, bool containsPairs,
}
}
/* 找到与readend pos相等的所有readend */
/* readend posreadend */
static void getEqualRE(const ReadEnds &re, vector<ReadEnds> &src, vector<ReadEnds> *dst) {
auto range = std::equal_range(src.begin(), src.end(), re, ReadEnds::CorLittleThan); // 只比对位点
auto range = std::equal_range(src.begin(), src.end(), re, ReadEnds::CorLittleThan); //
dst->insert(dst->end(), range.first, range.second);
}
#define LOWER_BOUND(tid, nthread, ndata) ((tid) * (ndata) / (nthread))
#define UPPER_BOUND(tid, nthread, ndata) ((tid + 1) * (ndata) / (nthread))
/* 处理pairs */
/* pairs */
static void processPairs(vector<ReadEnds> &readEnds, DPSet<DupInfo> *dupIdx, MDSet<int64_t> *opticalDupIdx,
DPSet<DupInfo> *repIdx, MDSet<int64_t> *notDupIdx = nullptr,
MDSet<int64_t> *notOpticalDupIdx = nullptr, MDSet<int64_t> *notRepIdx = nullptr) {
// return;
vector<const ReadEnds *> vpCache; // 有可能是冗余的reads
vector<const ReadEnds *> vpCache; // reads
const ReadEnds *pReadEnd = nullptr;
for (auto &re : readEnds) {
if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样
vpCache.push_back(&re); // 处理一个潜在的冗余组
if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) //
vpCache.push_back(&re); //
else {
markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx,
notRepIdx); // 不一样
notRepIdx); //
vpCache.clear();
vpCache.push_back(&re);
pReadEnd = &re;
@ -163,11 +163,11 @@ static void processPairs(vector<ReadEnds> &readEnds, DPSet<DupInfo> *dupIdx, MDS
markDupsForPairs(vpCache, dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx);
}
/* 处理frags */
/* frags */
static void processFrags(vector<ReadEnds> &readEnds, DPSet<DupInfo> *dupIdx, MDSet<int64_t> *notDupIdx = nullptr) {
bool containsPairs = false;
bool containsFrags = false;
vector<const ReadEnds *> vpCache; // 有可能是冗余的reads
vector<const ReadEnds *> vpCache; // reads
const ReadEnds *pReadEnd = nullptr;
for (auto &re : readEnds) {
if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, false)) {
@ -191,7 +191,7 @@ static void processFrags(vector<ReadEnds> &readEnds, DPSet<DupInfo> *dupIdx, MDS
}
/* 获取交叉部分的数据 */
/* */
static inline void getIntersectData(vector<ReadEnds> &leftArr, vector<ReadEnds> &rightArr, vector<ReadEnds> *dst,
bool isPairCmp = false) {
if (leftArr.empty() || rightArr.empty()) {
@ -204,7 +204,7 @@ static inline void getIntersectData(vector<ReadEnds> &leftArr, vector<ReadEnds>
while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx - leftSpan], rightArr[rightStartIdx], isPairCmp)) {
leftSpan += 1;
if (leftSpan > leftEndIdx) { // 上一个的范围被下一个全部包围了可能会有bug上上个也与下一个有交集呢
if (leftSpan > leftEndIdx) { // bug
leftSpan = leftArr.size() - 1;
break;
}
@ -212,7 +212,7 @@ static inline void getIntersectData(vector<ReadEnds> &leftArr, vector<ReadEnds>
while (!ReadEnds::ReadLittleThan(leftArr[leftEndIdx], rightArr[rightSpan], isPairCmp)) {
rightSpan += 1;
if (rightSpan == rightArr.size() - 1) // 同上可能会有bug
if (rightSpan == rightArr.size() - 1) // bug
break;
}
dst->insert(dst->end(), leftArr.end() - leftSpan, leftArr.end());
@ -223,7 +223,7 @@ static inline void getIntersectData(vector<ReadEnds> &leftArr, vector<ReadEnds>
std::sort(dst->begin(), dst->end(), ReadEnds::FragLittleThan);
}
/* frags重叠部分的dup idx放进数据中 */
/* fragsdup idx */
static inline void refreshFragDupIdx(DPSet<DupInfo> &dupIdx, MDSet<int64_t> &notDupIdx, MarkDupDataArg *lastArg,
MarkDupDataArg *curArg) {
auto &lp = *lastArg;
@ -259,26 +259,26 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) {
PROF_START(gen);
size_t start_id = LOWER_BOUND(idx, nThread, bams.size());
size_t end_id = UPPER_BOUND(idx, nThread, bams.size());
for (size_t i = start_id; i < end_id; ++i) { // 循环处理每个read
for (size_t i = start_id; i < end_id; ++i) { // read
BamWrap *bw = bams[i];
const int64_t bamIdx = bamStartIdx + i;
if (bw->GetReadUnmappedFlag()) {
if (bw->b->core.tid == -1)
// When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort).
break;
} else if (!bw->IsSecondaryOrSupplementary()) { // 是主要比对
} else if (!bw->IsSecondaryOrSupplementary()) { //
ReadEnds fragEnd;
buildReadEnds(*bw, bamIdx, rnParser, &fragEnd);
frags.push_back(fragEnd); // 添加进frag集合
if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // pairend而且互补的read也比对上了
frags.push_back(fragEnd); // frag
if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) { // pairendread
string key = bw->query_name();
if (unpairedDic.find(key) == unpairedDic.end()) {
unpairedDic[key] = {taskSeq, fragEnd};
} else { // 找到了pairend
} else { // pairend
auto &pairedEnds = unpairedDic.at(key).unpairedRE;
modifyPairedEnds(fragEnd, &pairedEnds);
pairs.push_back(pairedEnds);
unpairedDic.erase(key); // 删除找到的pairend
unpairedDic.erase(key); // pairend
}
}
}
@ -302,8 +302,8 @@ static void doGenRE(PipelineArg &pipeArg) {
const int numThread = pipeArg.numThread;
kt_for(numThread, mtGenerateReadEnds, &pipeArg, numThread);
// 用来在genRE阶段计算一些Sort阶段应该计算的数据保持每个step用时更平衡
// 轮询每个线程中未找到匹配的read找到匹配的
// genRESortstep
// read
genREData.unpairedDic.clear();
vector<ReadEnds> &pairs = genREData.pairsArr[numThread];
pairs.clear();
@ -316,11 +316,11 @@ static void doGenRE(PipelineArg &pipeArg) {
const ReadEnds &fragEnd = val.second.unpairedRE;
if (genREData.unpairedDic.find(key) == genREData.unpairedDic.end()) {
genREData.unpairedDic[key] = {readData.taskSeq, fragEnd};
} else { // 找到了pairend
} else { // pairend
auto &pairedEnds = genREData.unpairedDic.at(key).unpairedRE;
modifyPairedEnds(fragEnd, &pairedEnds);
pairs.push_back(pairedEnds);
genREData.unpairedDic.erase(key); // 删除找到的pairend
genREData.unpairedDic.erase(key); // pairend
}
}
}
@ -367,11 +367,11 @@ static void doMarkDup(PipelineArg &pipeArg) {
sortData.dataPtr = tmpPtr;
SortMarkData &smd = *(SortMarkData *)mdData.dataPtr;
// 先处理 pair
// pair
PROF_START(markdup_pair);
processPairs(smd.pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx);
PROF_END(gprof[GP_markdup_pair], markdup_pair);
// 再处理frag
// frag
PROF_START(markdup_frag);
processFrags(smd.frags, &mdData.fragDupIdx);
PROF_END(gprof[GP_markdup_frag], markdup_frag);
@ -418,7 +418,7 @@ static void refreshMarkDupData(DPSet<DupInfo> &dupIdx, MDSet<int64_t> &opticalDu
refreshNotDupIdx(repIdx, p.pairRepIdx);
}
// 处理相邻数据块之间重叠的部分
//
static void processIntersectFragPairs(MarkDupData &lp, MarkDupData &cp) {
SortMarkData &lsm = *(SortMarkData *)lp.dataPtr;
SortMarkData &csm = *(SortMarkData *)cp.dataPtr;
@ -430,7 +430,7 @@ static void processIntersectFragPairs(MarkDupData &lp, MarkDupData &cp) {
MDSet<int64_t> notOpticalDupIdx;
MDSet<int64_t> notDupIdx;
MDSet<int64_t> notRepIdx;
// 先处理重叠的frags
// frags
getIntersectData(lsm.frags, csm.frags, &reArr);
processFrags(reArr, &dupIdx, &notDupIdx);
refreshDupIdx(dupIdx, lp.fragDupIdx);
@ -438,34 +438,34 @@ static void processIntersectFragPairs(MarkDupData &lp, MarkDupData &cp) {
refreshNotDupIdx(notDupIdx, lp.fragDupIdx);
refreshNotDupIdx(notDupIdx, cp.fragDupIdx);
// 再处理重叠的pairs
// pairs
reArr.clear();
dupIdx.clear();
notDupIdx.clear();
getIntersectData(lsm.pairs, csm.pairs, &reArr, true);
processPairs(reArr, &dupIdx, &opticalDupIdx, &repIdx, &notDupIdx, &notOpticalDupIdx, &notRepIdx);
refreshMarkDupData(dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx, cp, lp); // 放在cp因为后面global里可能有相同的dup防止多次出现
refreshMarkDupData(dupIdx, opticalDupIdx, repIdx, notDupIdx, notOpticalDupIdx, notRepIdx, cp, lp); // cpglobaldup
}
// 在相邻的数据块之间寻找未匹配的readends, 找到匹配的放到lp
// readends, lp
static void findUnpairedInDatas(MarkDupData &lp, MarkDupData &cp) {
auto &interPairedData = lp.ckeyReadEndsMap;
SortMarkData &lsm = *(SortMarkData *)lp.dataPtr;
SortMarkData &csm = *(SortMarkData *)cp.dataPtr;
for (auto itr = lsm.unpairedDic.begin(); itr != lsm.unpairedDic.end(); ) { // 遍历上一个任务中的每个未匹配的read
for (auto itr = lsm.unpairedDic.begin(); itr != lsm.unpairedDic.end(); ) { // read
auto &lastUnpair = *itr;
auto &readName = lastUnpair.first;
auto &lastUnpairInfo = lastUnpair.second;
auto lastRE = lastUnpairInfo.unpairedRE; // 未匹配的read end
if (csm.unpairedDic.find(readName) != csm.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read
auto lastRE = lastUnpairInfo.unpairedRE; // read end
if (csm.unpairedDic.find(readName) != csm.unpairedDic.end()) { // read
auto &curUnpairInfo = csm.unpairedDic[readName];
auto &curRE = curUnpairInfo.unpairedRE;
modifyPairedEnds(curRE, &lastRE); // lastRE当做找到匹配后完整的ReadEnds
modifyPairedEnds(curRE, &lastRE); // lastREReadEnds
CalcKey ck(lastRE);
auto &pairArr = interPairedData[ck];
pairArr.push_back(lastRE);
// dict中清除配对后的readends
// dictreadends
csm.unpairedDic.erase(readName);
itr = lsm.unpairedDic.erase(itr);
} else {
@ -474,23 +474,23 @@ static void findUnpairedInDatas(MarkDupData &lp, MarkDupData &cp) {
}
}
// globallp中寻找未匹配的readends, 找到匹配的放到global
// globallpreadends, global
static void findUnpairedInGlobal(IntersectData &g, MarkDupData &lp) {
auto &interPairedData = g.ckeyReadEndsMap;
SortMarkData &lsm = *(SortMarkData *)lp.dataPtr;
for (auto itr = lsm.unpairedDic.begin(); itr != lsm.unpairedDic.end();) { // 遍历上一个任务中的每个未匹配的read
for (auto itr = lsm.unpairedDic.begin(); itr != lsm.unpairedDic.end();) { // read
auto &lastUnpair = *itr;
auto &readName = lastUnpair.first;
auto &lastUnpairInfo = lastUnpair.second;
auto lastRE = lastUnpairInfo.unpairedRE; // 未匹配的read end
if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // global里找到了这个未匹配的read
auto lastRE = lastUnpairInfo.unpairedRE; // read end
if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // globalread
auto &gUnpairInfo = g.unpairedDic[readName];
auto &gRE = gUnpairInfo.unpairedRE;
modifyPairedEnds(lastRE, &gRE); // gRE当做找到匹配后完整的ReadEnds
modifyPairedEnds(lastRE, &gRE); // gREReadEnds
CalcKey ck(gRE);
auto &pairArr = interPairedData[ck];
pairArr.push_back(gRE);
// dict中清除配对后的readends
// dictreadends
g.unpairedDic.erase(readName);
itr = lsm.unpairedDic.erase(itr);
} else {
@ -528,10 +528,10 @@ static void doIntersect(PipelineArg &pipeArg) {
SortMarkData &lsm = *(SortMarkData *)lp.dataPtr;
SortMarkData &csm = *(SortMarkData *)cp.dataPtr;
// 处理相邻数据块之间重叠的部分
//
processIntersectFragPairs(lp, cp);
// 检查确保lpnp之间没有数据交叉
// lpnp
int64_t lastLeft = INT64_MIN, lastRight = INT64_MAX, curLeft = INT64_MAX, curRight = INT64_MAX;
if (lsm.pairs.size() > 0) {
lastLeft = lsm.frags[0].Left();
@ -549,21 +549,21 @@ static void doIntersect(PipelineArg &pipeArg) {
}
g.rightPos = lastRight;
findUnpairedInDatas(lp, cp); // 找到的匹配放到lp
findUnpairedInGlobal(g, cp); // cp中未匹配的放到global里保存
findUnpairedInDatas(lp, cp); // lp
findUnpairedInGlobal(g, cp); // cpglobal
// 处理lp中的新找到的匹配
// lp
TaskSeqDupInfo t;
for (auto itr = lp.ckeyReadEndsMap.begin(); itr != lp.ckeyReadEndsMap.end();) {
auto &ckVal = *itr;
auto &ck = ckVal.first;
auto &pairArr = ckVal.second;
getEqualRE(pairArr[0], lsm.pairs, &pairArr); // 如果不在计算范围内会放在global
if (ck.Right() <= lastRight) { // 必须在当前数据块的范围内, 才进行处理
if (ck.Left() >= curLeft) { // 在交叉的范围内才去加上这些在cp中的数据
getEqualRE(pairArr[0], lsm.pairs, &pairArr); // global
if (ck.Right() <= lastRight) { // ,
if (ck.Left() >= curLeft) { // cp
getEqualRE(pairArr[0], csm.pairs, &pairArr);
}
// global里找一找ck
// globalck
auto gitr = g.ckeyReadEndsMap.find(ck);
if (gitr != g.ckeyReadEndsMap.end()) {
auto &gPairArr = gitr->second;
@ -578,7 +578,7 @@ static void doIntersect(PipelineArg &pipeArg) {
}
}
// 处理找到匹配的global数据
// global
for (auto itr = g.ckeyReadEndsMap.begin(); itr != g.ckeyReadEndsMap.end();) {
auto &ckVal = *itr;
auto &ck = ckVal.first;
@ -586,7 +586,7 @@ static void doIntersect(PipelineArg &pipeArg) {
if (ck.Left() >= lastLeft) {
getEqualRE(pairArr[0], lsm.pairs, &pairArr);
}
if (ck.Right() <= lastRight) { // 只有在这个范围内对应位点的所有reads才完全都包含了
if (ck.Right() <= lastRight) { // reads
sort(pairArr.begin(), pairArr.end(), ReadEnds::PairLittleThan);
processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.notDupIdx, &t.notOpticalDupIdx, &t.notRepIdx);
itr = g.ckeyReadEndsMap.erase(itr);
@ -595,16 +595,16 @@ static void doIntersect(PipelineArg &pipeArg) {
}
}
// 剩余的在lp中没处理的放到global
// lpglobal
for (auto &ckVal : lp.ckeyReadEndsMap) {
auto &pairArr = g.ckeyReadEndsMap[ckVal.first];
pairArr.insert(pairArr.end(), ckVal.second.begin(), ckVal.second.end());
}
lp.ckeyReadEndsMap.clear();
// 更新一下冗余结果
//
refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, lp, cp);
// 处理g中新找到的匹配
// g
putDupinfoToGlobal(g, lp);
for (auto &unPair : lsm.unpairedDic) {
@ -620,16 +620,16 @@ static void *pipeRead(void *data) {
while (1) {
PROF_START(read_wait);
yarn::POSSESS(pipeArg.readSig);
yarn::WAIT_FOR(pipeArg.readSig, yarn::NOT_TO_BE, 1); // 只有一个坑因为在bambuf内部支持异步读取
yarn::WAIT_FOR(pipeArg.readSig, yarn::NOT_TO_BE, 1); // bambuf
PROF_END(gprof[GP_read_wait], read_wait);
size_t readNum = 0;
PROF_START(read);
if (inBamBuf.ReadStat() >= 0)
readNum = inBamBuf.ReadBam(); // 读入新一轮的数据
readNum = inBamBuf.ReadBam(); //
PROF_END(gprof[GP_read], read);
if (readNum < 1) {
pipeArg.readFinish = 1;
yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据
yarn::TWIST(pipeArg.readSig, yarn::BY, 1); //
break;
}
spdlog::info("{} reads processed in {} round", readNum, pipeArg.readOrder);
@ -640,9 +640,9 @@ static void *pipeRead(void *data) {
pipeArg.readData.taskSeq = pipeArg.readOrder;
readNumSum += readNum;
inBamBuf.ClearAll(); // 清理上一轮读入的数据
inBamBuf.ClearAll(); //
pipeArg.readOrder += 1; // for next
yarn::TWIST(pipeArg.readSig, yarn::BY, 1); // 读入了一轮数据
yarn::TWIST(pipeArg.readSig, yarn::BY, 1); //
}
spdlog::info("{} total reads processed", readNumSum);
return 0;
@ -659,21 +659,21 @@ static void *pipeGenRE(void *data) {
while (1) {
PROF_START(gen_wait);
yarn::POSSESS(pipeArg.readSig);
yarn::WAIT_FOR(pipeArg.readSig, yarn::NOT_TO_BE, 0); // 等待有数据
yarn::WAIT_FOR(pipeArg.readSig, yarn::NOT_TO_BE, 0); //
yarn::POSSESS(pipeArg.genRESig);
PROF_END(gprof[GP_gen_wait], gen_wait);
yarn::WAIT_FOR(pipeArg.genRESig, yarn::NOT_TO_BE, pipeArg.GENBUFNUM); // BUFNUM个坑
yarn::RELEASE(pipeArg.genRESig); // 因为有不止一个位置所以要释放
yarn::WAIT_FOR(pipeArg.genRESig, yarn::NOT_TO_BE, pipeArg.GENBUFNUM); // BUFNUM
yarn::RELEASE(pipeArg.genRESig); //
if (pipeArg.readFinish) { // 读取结束的时候没有新的数据需要处理了
if (pipeArg.readFinish) { //
yarn::POSSESS(pipeArg.genRESig);
pipeArg.genREFinish = 1;
yarn::TWIST(pipeArg.genRESig, yarn::BY, 1);
yarn::TWIST(pipeArg.readSig, yarn::BY, -1);
break;
}
/* 处理bam生成readends */
/* bamreadends */
PROF_START(gen);
doGenRE(pipeArg);
PROF_END(gprof[GP_gen], gen);
@ -681,7 +681,7 @@ static void *pipeGenRE(void *data) {
yarn::POSSESS(pipeArg.genRESig);
pipeArg.genREOrder += 1;
yarn::TWIST(pipeArg.genRESig, yarn::BY, 1);
yarn::TWIST(pipeArg.readSig, yarn::BY, -1); // 使用了一次读入的数据
yarn::TWIST(pipeArg.readSig, yarn::BY, -1); //
}
return 0;
}
@ -691,19 +691,19 @@ static void *pipeSort(void *data) {
while (1) {
PROF_START(sort_wait);
yarn::POSSESS(pipeArg.genRESig);
yarn::WAIT_FOR(pipeArg.genRESig, yarn::NOT_TO_BE, 0); // 等待有数据
yarn::WAIT_FOR(pipeArg.genRESig, yarn::NOT_TO_BE, 0); //
yarn::RELEASE(pipeArg.genRESig);
PROF_END(gprof[GP_sort_wait], sort_wait);
yarn::POSSESS(pipeArg.sortSig);
yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, pipeArg.SORTBUFNUM); // BUFNUM个位置
yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, pipeArg.SORTBUFNUM); // BUFNUM
yarn::RELEASE(pipeArg.sortSig);
if (pipeArg.genREFinish) {
// 处理完剩余数据
//
while (pipeArg.sortOrder < pipeArg.genREOrder) {
yarn::POSSESS(pipeArg.sortSig);
yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, pipeArg.SORTBUFNUM); // BUFNUM个位置
yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, pipeArg.SORTBUFNUM); // BUFNUM
yarn::RELEASE(pipeArg.sortSig);
PROF_START(sort);
@ -719,7 +719,7 @@ static void *pipeSort(void *data) {
yarn::TWIST(pipeArg.sortSig, yarn::BY, 1);
break;
}
/* 排序 readends */
/* readends */
PROF_START(sort);
doSort(pipeArg);
PROF_END(gprof[GP_sort], sort);
@ -739,7 +739,7 @@ static void *pipeMarkDup(void *data) {
while (1) {
PROF_START(markdup_wait);
yarn::POSSESS(pipeArg.sortSig);
yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, 0); // 等待有数据
yarn::WAIT_FOR(pipeArg.sortSig, yarn::NOT_TO_BE, 0); //
yarn::RELEASE(pipeArg.sortSig);
PROF_END(gprof[GP_markdup_wait], markdup_wait);
@ -748,7 +748,7 @@ static void *pipeMarkDup(void *data) {
yarn::RELEASE(pipeArg.markDupSig);
if (pipeArg.sortFinish) {
// 应该还得处理剩余的数据
//
while (pipeArg.markDupOrder < pipeArg.sortOrder) {
yarn::POSSESS(pipeArg.markDupSig);
yarn::WAIT_FOR(pipeArg.markDupSig, yarn::NOT_TO_BE, pipeArg.MARKBUFNUM);
@ -767,7 +767,7 @@ static void *pipeMarkDup(void *data) {
yarn::TWIST(pipeArg.markDupSig, yarn::TO, 2 + pipeArg.MARKBUFNUM);
break;
}
/* 冗余检测 readends */
/* readends */
PROF_START(markdup);
doMarkDup(pipeArg);
PROF_END(gprof[GP_markdup], markdup);
@ -787,7 +787,7 @@ static void *pipeIntersect(void *data) {
while (1) {
PROF_START(intersect_wait);
yarn::POSSESS(pipeArg.markDupSig);
yarn::WAIT_FOR(pipeArg.markDupSig, yarn::TO_BE_MORE_THAN, kInitIntersectOrder); // 等待有数据
yarn::WAIT_FOR(pipeArg.markDupSig, yarn::TO_BE_MORE_THAN, kInitIntersectOrder); //
yarn::RELEASE(pipeArg.markDupSig);
PROF_END(gprof[GP_intersect_wait], intersect_wait);
@ -801,7 +801,7 @@ static void *pipeIntersect(void *data) {
break;
}
/* 交叉数据处理 readends */
/* readends */
PROF_START(intersect);
doIntersect(pipeArg);
PROF_END(gprof[GP_intersect], intersect);
@ -814,7 +814,7 @@ static void *pipeIntersect(void *data) {
return 0;
}
/* 当所有任务结束后global data里还有未处理的数据 */
/* global data */
static void processLastData(PipelineArg &pipeArg) {
IntersectData &g = pipeArg.intersectData;
MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM];
@ -823,7 +823,7 @@ static void processLastData(PipelineArg &pipeArg) {
if (lsm.pairs.size() > 0) {
lastLeft = lsm.frags[0].Left();
}
// 处理找到匹配的global数据
// global
TaskSeqDupInfo t;
for (auto itr = g.ckeyReadEndsMap.begin(); itr != g.ckeyReadEndsMap.end();) {
auto &ckVal = *itr;
@ -836,9 +836,9 @@ static void processLastData(PipelineArg &pipeArg) {
processPairs(pairArr, &t.dupIdx, &t.opticalDupIdx, &t.repIdx);
itr = g.ckeyReadEndsMap.erase(itr);
}
// 更新一下冗余结果
//
refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.notDupIdx, t.notOpticalDupIdx, t.notRepIdx, lp);
// 处理g中新找到的匹配
// g
putDupinfoToGlobal(g, lp);
}
@ -866,7 +866,7 @@ static void parallelPipeline() {
// spdlog::info("result size : {} GB", nsgv::gDupRes.byteSize() / 1024.0 / 1024 / 1024);
}
/* 并行流水线方式处理数据标记冗余 */
/* */
void PipelineMarkDups() {
if (nsgv::gMdArg.NUM_THREADS > 1)
return parallelPipeline();
@ -879,11 +879,11 @@ void PipelineMarkDups() {
for (int i = 0; i < pipeArg.GENBUFNUM; ++i) {
pipeArg.genREData[i].Init(pipeArg.numThread);
}
pipeArg.intersectOrder = 1; // do intersect 1开始
pipeArg.intersectOrder = 1; // do intersect 1
while (1) {
size_t readNum = 0;
if (inBamBuf.ReadStat() >= 0)
readNum = inBamBuf.ReadBam(); // 读入新一轮的数据
readNum = inBamBuf.ReadBam(); //
if (readNum < 1) {
break;
}
@ -908,7 +908,7 @@ void PipelineMarkDups() {
}
readNumSum += readNum;
inBamBuf.ClearAll(); // 清理上一轮读入的数据
inBamBuf.ClearAll(); //
pipeArg.readOrder += 1; // for next
}
processLastData(pipeArg);

View File

@ -8,22 +8,22 @@
struct ReadData {
vector<BamWrap *> bams; // read step output
int64_t bamStartIdx = 0; // 每轮读入的bam数组起始位置在全局bam中的索引
int64_t taskSeq = 0; // 任务序号
int64_t bamStartIdx = 0; // bambam
int64_t taskSeq = 0; //
};
struct GenREData {
vector<vector<ReadEnds>> pairsArr; // 成对的reads
vector<vector<ReadEnds>> fragsArr; // 暂未找到配对的reads
vector<UnpairedNameMap> unpairedDicArr; // 用来寻找pair end
vector<vector<ReadEnds>> pairsArr; // reads
vector<vector<ReadEnds>> fragsArr; // reads
vector<UnpairedNameMap> unpairedDicArr; // pair end
void Init(int nThread) {
for (int i = 0; i <= nThread; ++i) { // 比线程多一个主要是pairs多一个其他没用
for (int i = 0; i <= nThread; ++i) { // pairs
pairsArr.push_back(vector<ReadEnds>());
fragsArr.push_back(vector<ReadEnds>());
unpairedDicArr.push_back(UnpairedNameMap());
}
}
UnpairedNameMap unpairedDic; // 代替sort step中一部分计算
UnpairedNameMap unpairedDic; // sort step
size_t byteSize() {
size_t bytes = 0;
for (auto &v : pairsArr)
@ -38,9 +38,9 @@ struct GenREData {
};
struct SortMarkData {
vector<ReadEnds> pairs; // 成对的reads
vector<ReadEnds> frags; // 暂未找到配对的reads
UnpairedNameMap unpairedDic; // 用来寻找pair end
vector<ReadEnds> pairs; // reads
vector<ReadEnds> frags; // reads
UnpairedNameMap unpairedDic; // pair end
size_t byteSize() {
size_t bytes = 0;
for (auto &r : pairs) bytes += sizeof(r);
@ -56,11 +56,11 @@ struct SortData {
};
struct MarkDupData {
int64_t taskSeq = 0; // 任务序号
DPSet<DupInfo> pairDupIdx; // pair的冗余read的索引
MDSet<int64_t> pairOpticalDupIdx; // optical冗余read的索引
DPSet<DupInfo> fragDupIdx; // frag的冗余read的索引
DPSet<DupInfo> pairRepIdx; // pairdupset代表read的索引
int64_t taskSeq = 0; //
DPSet<DupInfo> pairDupIdx; // pairread
MDSet<int64_t> pairOpticalDupIdx; // opticalread
DPSet<DupInfo> fragDupIdx; // fragread
DPSet<DupInfo> pairRepIdx; // pairdupsetread
CkeyReadEndsMap ckeyReadEndsMap;
volatile void *dataPtr; // SortMarkData pointer
@ -111,11 +111,11 @@ struct DupResult {
};
struct IntersectData {
UnpairedNameMap unpairedDic; // 用来寻找pair end
UnpairedNameMap unpairedDic; // pair end
CkeyReadEndsMap ckeyReadEndsMap;
int64_t rightPos = 0;
// 每个task对应一个vector
// taskvector
vector<vector<DupInfo>> &dupIdxArr;
vector<vector<int64_t>> &opticalDupIdxArr;
vector<vector<DupInfo>> &repIdxArr;
@ -138,7 +138,7 @@ struct IntersectData {
}
};
// 记录流水线状态task的序号以及某阶段是否结束
// task
struct PipelineArg {
static const int GENBUFNUM = 2;
static const int SORTBUFNUM = 2;
@ -161,8 +161,8 @@ struct PipelineArg {
yarn::lock_t *markDupSig;
PipelineArg(DupResult *resPtr) : intersectData(resPtr) {
readSig = yarn::NEW_LOCK(0); // 最大值1, bufferbambuf中实现了对调用透明
genRESig = yarn::NEW_LOCK(0); // 最大值2, buffer
readSig = yarn::NEW_LOCK(0); // 1, bufferbambuf
genRESig = yarn::NEW_LOCK(0); // 2, buffer
sortSig = yarn::NEW_LOCK(0);
markDupSig = yarn::NEW_LOCK(0);
for (int i = 0; i < SORTBUFNUM; ++i) {
@ -208,8 +208,8 @@ struct PipelineArg {
};
struct REArrIdIdx {
int arrId = 0; // 数组索引
uint64_t arrIdx = 0; // 数组内部当前索引
int arrId = 0; //
uint64_t arrIdx = 0; //
const ReadEnds *pE = nullptr;
};
@ -224,7 +224,7 @@ struct REPairGreaterThan {
template <typename CMP>
struct ReadEndsHeap {
// 将冗余索引和他对应的task vector对应起来
// task vector
vector<vector<ReadEnds>> *arr2d;
priority_queue<REArrIdIdx, vector<REArrIdIdx>, CMP> minHeap;
uint64_t popNum = 0;
@ -269,5 +269,5 @@ struct ReadEndsHeap {
}
};
// 并行运行mark duplicate
// mark duplicate
void PipelineMarkDups();

View File

@ -19,13 +19,13 @@ using std::string;
using std::unordered_set;
using std::vector;
/* 存放未匹配readend相同位点的所有readend */
/* readendreadend */
struct UnpairedREInfo {
int64_t taskSeq; // 对应第几轮计算
int64_t taskSeq; //
ReadEnds unpairedRE;
};
/* 对于一个pair数据一个完整的计算点包含read1的比对位置和read2的比对位置 */
/* pairread1read2 */
struct CalcKey {
int8_t orientation = -1;
int32_t read1ReferenceIndex = -1;
@ -70,7 +70,7 @@ struct CalcKey {
comp = read2ReferenceIndex - o.read2ReferenceIndex;
if (comp == 0)
comp = read2Coordinate - o.read2Coordinate;
// 需要orientation因为要跟排序的比较方式和顺序一致
// orientation
if (comp == 0)
comp = orientation - o.orientation;
return comp < 0;
@ -91,7 +91,7 @@ struct CalcKey {
comp = read2ReferenceIndex - o.read2ReferenceIndex;
if (comp == 0)
comp = read2Coordinate - o.read2Coordinate;
// 需要orientation因为要跟排序的比较方式和顺序一致
// orientation
if (comp == 0)
comp = orientation - o.orientation;
return comp < 0;
@ -113,10 +113,10 @@ struct CalcKeyEqual {
bool operator()(const CalcKey &o1, const CalcKey &o2) const { return o1 == o2; }
};
/* 用来记录冗余索引相关的信息 */
/* */
struct DupInfo {
int16_t dupSet = 0; // dup set size
uint16_t repIdxHigh = 0; // 这一批冗余中的非冗余read 代表的索引
uint16_t repIdxHigh = 0; // read
uint32_t repIdxLow = 0;
int64_t idx;
@ -164,7 +164,7 @@ using CalcSet = set<T>;
using ReadEndsSet = tsl::robin_set<ReadEnds, ReadEndsHash, ReadEndsEqual>;
/* 当遗留数据在当前任务找到了pair read进行冗余计算时候存放结果的数据结构 */
/* pair read */
struct TaskSeqDupInfo {
DPSet<DupInfo> dupIdx;
MDSet<int64_t> opticalDupIdx;
@ -174,7 +174,7 @@ struct TaskSeqDupInfo {
MDSet<int64_t> notRepIdx;
};
/* 保存有未匹配pair位点的信息包括read end数组和有几个未匹配的read end */
/* pairread endread end */
struct UnpairedPosInfo {
int unpairedNum = 0;
int64_t taskSeq;
@ -184,33 +184,33 @@ struct UnpairedPosInfo {
// typedef unordered_map<string, UnpairedREInfo> UnpairedNameMap;
// typedef unordered_map<int64_t, UnpairedPosInfo> UnpairedPositionMap;
typedef tsl::robin_map<string, UnpairedREInfo> UnpairedNameMap; // read name为索引保存未匹配的pair read
//typedef map<string, UnpairedREInfo> UnpairedNameMap; // read name为索引保存未匹配的pair read
typedef tsl::robin_map<int64_t, UnpairedPosInfo> UnpairedPositionMap; // 以位点为索引保存该位点包含的对应的所有read和该位点包含的剩余未匹配的read的数量
// typedef map<CalcKey, vector<ReadEnds>> CkeyReadEndsMap; // calckey为关键字保存在相邻数据块之前找到的匹配readEnds
typedef tsl::robin_map<string, UnpairedREInfo> UnpairedNameMap; // read namepair read
//typedef map<string, UnpairedREInfo> UnpairedNameMap; // read namepair read
typedef tsl::robin_map<int64_t, UnpairedPosInfo> UnpairedPositionMap; // readread
// typedef map<CalcKey, vector<ReadEnds>> CkeyReadEndsMap; // calckeyreadEnds
typedef unordered_map<CalcKey, vector<ReadEnds>, CalcKeyHash, CalcKeyEqual>
CkeyReadEndsMap; // calckey为关键字保存在相邻数据块之前找到的匹配readEnds
CkeyReadEndsMap; // calckeyreadEnds
// typedef tsl::robin_map<CalcKey, vector<ReadEnds>, CalcKeyHash, CalcKeyEqual> CkeyReadEndsMap; //
// calckey为关键字保存在相邻数据块之前找到的匹配readEnds
// calckeyreadEnds
/* 单线程处理冗余参数结构体 */
/* */
struct MarkDupDataArg {
int64_t taskSeq; // 任务序号
int64_t bamStartIdx; // 当前vBam数组中第一个bam记录在整体bam中所处的位置
vector<BamWrap *> bams; // 存放待处理的bam read
vector<ReadEnds> pairs; // 成对的reads
vector<ReadEnds> frags; // 暂未找到配对的reads
DPSet<DupInfo> pairDupIdx; // pair的冗余read的索引
MDSet<int64_t> pairOpticalDupIdx; // optical冗余read的索引
DPSet<DupInfo> fragDupIdx; // frag的冗余read的索引
DPSet<DupInfo> pairRepIdx; // pairdupset代表read的索引
MDSet<int64_t> pairSingletonIdx; // 某位置只有一对read的所有read pair个数
UnpairedNameMap unpairedDic; // 用来寻找pair end
UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd为了避免重复存储
int64_t taskSeq; //
int64_t bamStartIdx; // vBambambam
vector<BamWrap *> bams; // bam read
vector<ReadEnds> pairs; // reads
vector<ReadEnds> frags; // reads
DPSet<DupInfo> pairDupIdx; // pairread
MDSet<int64_t> pairOpticalDupIdx; // opticalread
DPSet<DupInfo> fragDupIdx; // fragread
DPSet<DupInfo> pairRepIdx; // pairdupsetread
MDSet<int64_t> pairSingletonIdx; // readread pair
UnpairedNameMap unpairedDic; // pair end
UnpairedPositionMap unpairedPosArr; // ReadEndReadEnd
};
/*
*
*
*/
template <typename T>
struct PairArrIdIdx {
@ -226,9 +226,9 @@ struct IdxGreaterThan {
template <typename T>
struct DupIdxQueue {
// 将冗余索引和他对应的task vector对应起来
// task vector
// 由于是多个task来查找冗余所以每次找到的冗余index都放在一个独立的vectorvector之间可能有重叠所以需要用一个最小堆来维护
// taskindexvectorvector
vector<vector<T>> *dupIdx2DArr;
priority_queue<PairArrIdIdx<T>, vector<PairArrIdIdx<T>>, IdxGreaterThan<T>> minHeap;
uint64_t popNum = 0;

View File

@ -1,6 +1,6 @@
/*
Description: read
ends
ends
Copyright : All right reserved by ICT
@ -39,15 +39,15 @@ struct PhysicalLocation {
int16_t y = -1;
};
/* 包含了所有read ends信息picard里边的 ReadEndsForMarkDuplicates*/
/* read endspicard ReadEndsForMarkDuplicates*/
struct ReadEnds : PhysicalLocation {
static const int8_t F = 0, R = 1, FF = 2, FR = 3, RR = 4, RF = 5;
/* 保留一些bam记录中的数据 */
/* bam */
bool read1FirstOfPair = true;
/* ReadEnds中的成员变量 */
/* ReadEnds */
/** Little struct-like class to hold read pair (and fragment) end data for
* duplicate marking. */
// int16_t libraryId; // 没用不考虑多样本
// int16_t libraryId; //
int8_t orientation = -1;
int32_t read1ReferenceIndex = -1;
int32_t read1Coordinate = -1;
@ -55,8 +55,8 @@ struct ReadEnds : PhysicalLocation {
// This field is overloaded for flow based processing as the end coordinate of read 1. (paired reads not supported)
int32_t read2Coordinate = -1;
/* Additional information used to detect optical dupes */
// int16_t readGroup = -1; 一般经过比对后的bam文件只有一个read
// groupnormal或者tumor
// int16_t readGroup = -1; bamread
// groupnormaltumor
/** For optical duplicate detection the orientation matters regard to 1st or
* 2nd end of a mate */
int8_t orientationForOpticalDuplicates = -1;
@ -64,7 +64,7 @@ struct ReadEnds : PhysicalLocation {
*/
bool isOpticalDuplicate = false;
/* ReadEndsForMarkDuplicates中的成员变量 */
/* ReadEndsForMarkDuplicates */
/** Little struct-like class to hold read pair (and fragment) end data for
* MarkDuplicatesWithMateCigar **/
int16_t score = 0;
@ -72,20 +72,20 @@ struct ReadEnds : PhysicalLocation {
int64_t read2IndexInFile = -1;
int64_t duplicateSetSize = -1;
/* ReadEndsForMarkDuplicatesWithBarcodes中的成员变量 (好像用不到) */
/* ReadEndsForMarkDuplicatesWithBarcodes () */
// int32_t barcode = 0; // primary barcode for this read (and pair)
// int32_t readOneBarcode = 0; // read one barcode, 0 if not present
// int32_t readTwoBarcode = 0; // read two barcode, 0 if not present or not
// paired
/* zzh增加的成员变量 */
int64_t posKey = -1; // 根据位置信息生成的关键字 return (int64_t)tid <<
// MAX_CONTIG_LEN_SHIFT | (int64_t)pos; 包含clip的序列也就是可能比map结果更偏左
/* zzh */
int64_t posKey = -1; // return (int64_t)tid <<
// MAX_CONTIG_LEN_SHIFT | (int64_t)pos; clipmap
/* 用来做一些判断因为一些readends会做多次操作比如task之间有重叠等等 */
/* readendstask */
int oprateTime = 0;
/* 根据pairend read的比对方向来确定整体的比对方向 */
/* pairend read */
static int8_t GetOrientationByte(bool read1NegativeStrand, bool read2NegativeStrand) {
if (read1NegativeStrand) {
if (read2NegativeStrand)
@ -100,7 +100,7 @@ struct ReadEnds : PhysicalLocation {
}
}
/* 比较两个readends是否一样有个冗余 */
/* readends */
static bool AreComparableForDuplicates(const ReadEnds &lhs, const ReadEnds &rhs, bool compareRead2) {
bool areComparable = true;
areComparable = lhs.read1ReferenceIndex == rhs.read1ReferenceIndex &&
@ -112,15 +112,15 @@ struct ReadEnds : PhysicalLocation {
return areComparable;
}
/* 比对方向是否正向 */
/* */
bool IsPositiveStrand() const { return orientation == F; }
/* pairend是否合适的比对上了 */
/* pairend */
bool IsPaired() const { return read2ReferenceIndex != -1; }
bool IsNegativeStrand() const { return orientation == R; }
// 对于相交的数据进行比对a是否小于b根据AreComparableForDuplicates函数得来
// abAreComparableForDuplicates
static inline bool ReadLittleThan(const ReadEnds &a, const ReadEnds &b, bool compareRead2 = false) {
int comp = a.read1ReferenceIndex - b.read1ReferenceIndex;
if (comp == 0)
@ -141,7 +141,7 @@ struct ReadEnds : PhysicalLocation {
int comp = a.read1ReferenceIndex - b.read1ReferenceIndex;
if (comp == 0)
comp = a.read1Coordinate - b.read1Coordinate;
if (comp == 0) // 这个放在坐标比较之前因为在解析bam的时候可能有的给read2ReferenceIndex赋值了,有的则没赋值
if (comp == 0) // bamread2ReferenceIndex,
comp = a.orientation - b.orientation;
if (comp == 0)
comp = a.read2ReferenceIndex - b.read2ReferenceIndex;
@ -150,7 +150,7 @@ struct ReadEnds : PhysicalLocation {
// if (comp == 0)
// comp = a.tile - b.tile;
// if (comp == 0)
// comp = a.x - b.x; // 由于picardbugshort类型来表示xy导致其可能为负数
// comp = a.x - b.x; // picardbugshortxy
// if (comp == 0)
// comp - a.y - b.y;
if (comp == 0)
@ -168,12 +168,12 @@ struct ReadEnds : PhysicalLocation {
comp = a.read2ReferenceIndex - b.read2ReferenceIndex;
if (comp == 0)
comp = a.read2Coordinate - b.read2Coordinate;
if (comp == 0) // 这个放在坐标比较了之后把坐标范围的放在之前这样对分段数据块比较好处理
if (comp == 0) //
comp = a.orientation - b.orientation;
// if (comp == 0)
// comp = a.tile - b.tile;
// if (comp == 0)
// comp = a.x - b.x; // 由于picardbugshort类型来表示xy导致其可能为负数
// comp = a.x - b.x; // picardbugshortxy
// if (comp == 0)
// comp - a.y - b.y;
if (comp == 0)
@ -191,7 +191,7 @@ struct ReadEnds : PhysicalLocation {
comp = a.read2ReferenceIndex - b.read2ReferenceIndex;
if (comp == 0)
comp = a.read2Coordinate - b.read2Coordinate;
if (comp == 0) // 这个放在坐标比较了之后把坐标范围的放在之前这样对分段数据块比较好处理
if (comp == 0) //
comp = a.orientation - b.orientation;
return comp < 0;
}

View File

@ -1,5 +1,5 @@
/*
Description: readnametile, x, y
Description: readnametile, x, y
Copyright : All right reserved by ICT
@ -28,7 +28,7 @@ using std::string;
* Provides access to the physical location information about a cluster.
* All values should be defaulted to -1 if unavailable. ReadGroup and Tile
* should only allow non-zero positive integers, x and y coordinates may be
* negative. 线
* negative.
*/
struct ReadNameParser {
/**
@ -72,7 +72,7 @@ struct ReadNameParser {
warnAboutRegexNotMatching = isWarn;
}
/* 重新设置readNameRegex */
/* readNameRegex */
void SetReadNameRegex(const string &strReadNameRegex) {
readNameRegex = strReadNameRegex;
if (strReadNameRegex == DEFAULT_READ_NAME_REGEX)
@ -83,7 +83,7 @@ struct ReadNameParser {
// readNamePattern = strReadNameRegex;
}
/* 添加测序时候的tile x y 信息 */
/* tile x y */
bool AddLocationInformation(const string &readName, PhysicalLocation *loc) {
if (!(readName == readNameStored)) {
if (ReadLocationInformation(readName, loc)) {

View File

@ -1,5 +1,5 @@
/*
Description: sam/bambuf
Description: sam/bambuf
Copyright : All right reserved by ICT
@ -10,25 +10,25 @@
#include "bam_buf.h"
/*
* BamBuf
* BamBuf
*/
// 读取数据直到读完或者缓冲区满
//
int BamBuf::ReadBam() {
int read_num = 0;
if (handle_last) { // 处理上次读入的最后一个bam
if (has_enough_space()) { // 必须调用在边界处调整memffset
if (handle_last) { // bam
if (has_enough_space()) { // memffset
++read_num;
append_one_bam();
} else {
return read_num; // 还是没空间
return read_num; //
}
}
while (read_stat_ >= 0 && (read_stat_ = sam_read1(fp, hdr, bw->b)) >= 0) {
bw->end_pos_ = BamWrap::BamEndPos(bw->b);
if (has_enough_space()) { // 还有空间
// if (true) { // 还有空间
if (has_enough_space()) { //
// if (true) { //
append_one_bam();
++read_num; // 放进缓存才算读取到
++read_num; //
} else {
break;
}
@ -41,7 +41,7 @@ int BamBuf::ReadBam() {
return read_num;
}
// 初始化缓存
//
void BamBuf::Init(samFile *fp, sam_hdr_t *hdr, int64_t mem_size) {
this->fp = fp;
this->hdr = hdr;
@ -71,9 +71,9 @@ void BamBuf::ClearAll() {
prepare_read();
}
// 为下一次读取做准备, 计算一些边界条件
// ,
inline void BamBuf::prepare_read() {
// 计算余留的下次计算可能用到的bam所占的位置
// bam
if (bv.size() > 0) {
BamWrap *bw = bv[0];
legacy_start = (int64_t)bw - (int64_t)mem;
@ -81,11 +81,11 @@ inline void BamBuf::prepare_read() {
legacy_end = (int64_t)bw + bw->length() - (int64_t)mem;
} else {
legacy_start = legacy_end = 0;
mem_offset = 0; // 上次没剩下那就从头存储
mem_offset = 0; //
}
}
// 检查缓存是否还有空间
//
inline bool BamBuf::has_enough_space() {
const uint32_t bam_len = bw->length();
int64_t potential_end = mem_offset + bam_len;
@ -104,7 +104,7 @@ inline bool BamBuf::has_enough_space() {
return potential_end < legacy_start;
}
// 处理一个读取后的bam
// bam
inline void BamBuf::append_one_bam() {
BamWrap *bwp = (BamWrap *)(mem + mem_offset);
*bwp = *bw;
@ -113,15 +113,15 @@ inline void BamBuf::append_one_bam() {
*bp = *bw->b;
bp->data = (uint8_t *)((char *)bwp->b + sizeof(bam1_t));
memcpy(bp->data, bw->b->data, bw->b->l_data);
// 更新下次存储的位置
//
mem_offset = (mem_offset + bw->length() + 8 - 1) & ~((size_t)(8 - 1));
bv.push_back(bwp);
}
// 处理上次读入的最后一个read
// read
inline bool BamBuf::handle_last_read() {
if (handle_last) { // 处理上次读入的最后一个bam
if (has_enough_space()) { // 必须调用在边界处调整memffset
if (handle_last) { // bam
if (has_enough_space()) { // memffset
append_one_bam();
handle_last = false;
return true;
@ -131,9 +131,9 @@ inline bool BamBuf::handle_last_read() {
}
/*
* AsyncIoBamBuf
* AsyncIoBamBuf
*/
// 初始化缓存
//
void AsyncIoBamBuf::Init(samFile *fp, sam_hdr_t *hdr, int64_t mem_size) {
if (use_async_io_) {
buf1_.Init(fp, hdr, mem_size >> 1);
@ -147,7 +147,7 @@ void AsyncIoBamBuf::Init(samFile *fp, sam_hdr_t *hdr, int64_t mem_size) {
}
}
// 读取数据
//
int AsyncIoBamBuf::ReadBam() {
if (use_async_io_) {
hasThread = true;
@ -178,11 +178,11 @@ int AsyncIoBamBuf::async_read_bam() {
first_read_ = false;
refresh_bam_arr();
} else {
// join, 交换缓冲区指针
// join,
pthread_join(*tid_, 0);
resize_buf();
if (need_read_) { // 需要交换指针
if (need_read_) { //
BamBuf *tmp = pi_;
pi_ = po_;
po_ = tmp;
@ -190,14 +190,14 @@ int AsyncIoBamBuf::async_read_bam() {
read_num = last_read_num_;
refresh_bam_arr();
}
// 异步读
//
pthread_create(tid_, 0, async_read, this);
return read_num;
}
void *AsyncIoBamBuf::async_read(void *data) {
AsyncIoBamBuf *ab = (AsyncIoBamBuf *)data;
if (ab->need_read_ && ab->ReadStat() >= 0) { // 需要读取
if (ab->need_read_ && ab->ReadStat() >= 0) { //
ab->last_read_num_ = ab->po_->ReadBam();
} else {
ab->last_read_num_ = 0;
@ -205,23 +205,23 @@ void *AsyncIoBamBuf::async_read(void *data) {
pthread_exit(0);
}
// 为下一次读取做准备,
// 计算一些边界条件延迟操作因为此时可能po_对应的buf正在读取
// ,
// po_buf
void AsyncIoBamBuf::ClearBeforeIdx(size_t idxInBv) { clear_before_idx_ = idxInBv; }
// 清空上一次所有读入的数据延迟操作因为此时可能po_对应的buf正在读取
// po_buf
void AsyncIoBamBuf::ClearAll() { clear_all_ = true; }
inline void AsyncIoBamBuf::resize_buf() {
if (clear_all_) { // 清理上一轮的数据
if (clear_all_) { //
clear_all_ = false;
po_->ClearBeforeIdx(legacy_size_);
pi_->ClearAll();
if (pi_->handle_last_read()) { // 上次读取有一个read没放入缓存
if (pi_->handle_last_read()) { // read
last_read_num_ += 1;
legacy_size_ = pi_->Size(); // 应该只有一个read
legacy_size_ = pi_->Size(); // read
need_read_ = true;
} else { // 没空间存放则不交换指针或者文件已经读取完毕
} else { //
legacy_size_ = 0;
need_read_ = false;
}
@ -229,16 +229,16 @@ inline void AsyncIoBamBuf::resize_buf() {
if (clear_before_idx_ < legacy_size_) {
po_->ClearBeforeIdx(clear_before_idx_);
legacy_size_ -= clear_before_idx_;
// 不需要交换指针不需要读取
//
need_read_ = false;
} else {
po_->ClearBeforeIdx(legacy_size_);
pi_->ClearBeforeIdx(clear_before_idx_ - legacy_size_);
if (pi_->handle_last_read()) { // 上次读取有一个read没放入缓存
if (pi_->handle_last_read()) { // read
last_read_num_ += 1;
legacy_size_ = pi_->Size(); // 应该只有一个read
legacy_size_ = pi_->Size(); // read
need_read_ = true;
} else { // 没空间存放则不交换指针或者文件已经读取完毕
} else { //
legacy_size_ = 0;
need_read_ = false;
}

View File

@ -1,5 +1,5 @@
/*
Description: sam/bambuf
Description: sam/bambuf
Copyright : All right reserved by ICT
@ -27,32 +27,32 @@ using std::vector;
using namespace std;
/*
* bam
* bam
*/
struct BamBuf {
sam_hdr_t *hdr; // sam文件的header信息
samFile *fp; // sam文件指针
BamWrap *bw = nullptr; // 用来循环读入bam
uint8_t *mem = nullptr; // 用来存放bam的数据,
// 程序结束后自动释放所以没在析构函数里释放
int64_t mem_offset = 0; // 下一次要存放的位置
int64_t mem_size; // 缓存大小
int read_stat_ = 0; // 读取状态是否读完
vector<BamWrap *> bv; // 方便对bam数据的访问
int64_t legacy_start = 0; // 没处理完的bammem中的起始位置, 闭区间
int64_t legacy_end = 0; // 没处理完的bammem中的结束位置, 开区间
bool handle_last = false; // 上次最后读入的bam是否需要处理
sam_hdr_t *hdr; // samheader
samFile *fp; // sam
BamWrap *bw = nullptr; // bam
uint8_t *mem = nullptr; // bam,
//
int64_t mem_offset = 0; //
int64_t mem_size; //
int read_stat_ = 0; //
vector<BamWrap *> bv; // bam
int64_t legacy_start = 0; // bammem,
int64_t legacy_end = 0; // bammem,
bool handle_last = false; // bam
// 初始化缓存
//
void Init(samFile *fp, sam_hdr_t *hdr, int64_t mem_size);
// 读取数据直到读完或者缓冲区满
//
int ReadBam();
// 为下一次读取做准备, 计算一些边界条件
// ,
void ClearBeforeIdx(size_t idxInBv);
// 清空上一次所有读入的数据
//
void ClearAll();
inline int64_t Size() { return bv.size(); } // 包含多少个read
inline int ReadStat() { return read_stat_; } // 文件的读取状态是否可读读取完全
inline int64_t Size() { return bv.size(); } // read
inline int ReadStat() { return read_stat_; } //
~BamBuf() {
if (this->mem != nullptr) {
free(this->mem);
@ -62,7 +62,7 @@ struct BamBuf {
free(bw);
}
}
void FreeMemory() // 释放开辟的内存
void FreeMemory() //
{
if (this->mem != nullptr) {
free(this->mem);
@ -75,14 +75,14 @@ struct BamBuf {
this->bw = nullptr;
}
void prepare_read();
// 检查缓存是否还有空间
//
bool has_enough_space();
// 处理一个读取后的bam
// bam
void append_one_bam();
// 处理上次读入的最后一个read
// read
bool handle_last_read();
// 针对bv的操作
// bv
inline BamWrap *operator[](int64_t pos) { return bv[pos]; }
inline void push_back(BamWrap *val) { bv.push_back(val); }
inline void clear() { bv.clear(); }
@ -90,53 +90,53 @@ struct BamBuf {
};
/*
* io
* io
*/
struct AsyncIoBamBuf {
BamBuf buf1_;
BamBuf buf2_;
BamBuf *pi_; // 当前用的buf
BamBuf *po_; // 后台在读取的buf
BamBuf *pi_; // buf
BamBuf *po_; // buf
pthread_t *tid_ = NULL;
bool hasThread = false;
int64_t legacy_size_ = 0; // 上一轮运算之后缓存中还剩余的上次读取的read数量
int64_t legacy_size_ = 0; // read
bool first_read_ = true;
int last_read_num_ = 0; // 上一次读取了多少reads
int last_read_num_ = 0; // reads
bool need_read_ = true;
bool use_async_io_ = true;
int64_t clear_before_idx_ = 0; // 用户异步读取下一轮读取之前清理掉clear_before_idx_之前的所有reads
bool clear_all_ = false; // 用于异步读取下一轮读取之前清理掉之前的所有reads
int64_t clear_before_idx_ = 0; // clear_before_idx_reads
bool clear_all_ = false; // reads
vector<BamWrap *> bam_arr_; // 用来访问buf中的bam
vector<BamWrap *> bam_arr_; // bufbam
AsyncIoBamBuf() {}
AsyncIoBamBuf(bool use_async) : use_async_io_(use_async) {}
// 析构
//
~AsyncIoBamBuf() {
if (tid_ != NULL) {
if (hasThread)
pthread_join(*tid_, 0);
free(tid_);
}
// 其他的内存就等程序结束自动释放
// buf的析构函数会自动调用
//
// buf
}
// 初始化缓存
//
void Init(samFile *fp, sam_hdr_t *hdr, int64_t mem_size);
// 读取数据
//
int ReadBam();
// 为下一次读取做准备, 计算一些边界条件
// ,
void ClearBeforeIdx(size_t idxInBv);
vector<BamWrap *> &GetBamArr() { return bam_arr_; } // 获取bam array
// 清空上一次所有读入的数据
vector<BamWrap *> &GetBamArr() { return bam_arr_; } // bam array
//
void ClearAll();
// 包含的read数量
// read
inline int64_t Size() { return legacy_size_ + pi_->Size(); }
inline int ReadStat() { return pi_->read_stat_; }
inline BamWrap *operator[](int64_t pos) { return bam_arr_[pos]; }
// 获取某一段reads
// reads
inline vector<BamWrap *> Slice(size_t startIdx, size_t endIdx) {
if (endIdx > startIdx) {
auto begItr = bam_arr_.begin();
@ -149,11 +149,11 @@ struct AsyncIoBamBuf {
buf2_.FreeMemory();
}
// 同步读取
//
int sync_read_bam();
// 异步读取
//
int async_read_bam();
// 异步读取线程函数
//
static void *async_read(void *data);
void resize_buf();
inline void refresh_bam_arr() {

View File

@ -1,5 +1,5 @@
/*
Description: sam/bambuf
Description: sam/bambuf
Copyright : All right reserved by ICT
@ -20,37 +20,37 @@
using namespace std;
/*
线
bam
bam
*/
/*
* sam read
* sam read
*/
struct BamWrap {
// contig左移后加上pos作为全局位置
const static int MAX_CONTIG_LEN_SHIFT = 40; // 将染色体id左移多少位和位点拼合在一起
// contigpos
const static int MAX_CONTIG_LEN_SHIFT = 40; // id
const static int READ_MAX_LENGTH = 200;
const static int READ_MAX_DEPTH = 1000; // 这只是用来初始化空间用的深度大于这个值也没关系
const static int READ_MAX_DEPTH = 1000; //
// 成员变量尽量少减少占用内存空间
//
bam1_t *b;
int64_t end_pos_; // bam的全局结束位置, 闭区间
int64_t end_pos_; // bam,
// 全局开始位置
//
inline int64_t start_pos() { return bam_global_pos(b); }
// 全局结束位置
//
inline int64_t end_pos() { return end_pos_; }
// reference对应的序列长度
// reference
inline int16_t read_len() { return (end_pos_ - start_pos() + 1); }
// contig内的开始位置
// contig
inline int32_t contig_pos() { return b->core.pos; }
// contig内部的结束位置
// contig
inline int32_t contig_end_pos() { return bam_pos(end_pos_); }
// 序列的长度AGTC字母个数
// AGTC
inline int16_t seq_len() { return b->core.l_qseq; }
// 算上开头的softclip
// softclip
inline int32_t softclip_start() {
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
@ -61,7 +61,7 @@ struct BamWrap {
return bc.pos;
}
// 算上结尾的softclip
// softclip
inline int32_t softclip_end() {
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
@ -72,7 +72,7 @@ struct BamWrap {
return bam_pos(end_pos_);
}
// 算上结尾的softclip
// softclip
inline int32_t right_softclip_len() {
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
@ -83,7 +83,7 @@ struct BamWrap {
return 0;
}
// 获取序列
//
inline std::string sequence() {
ostringstream oss;
char *seq = (char *)bam_get_seq(b);
@ -96,9 +96,9 @@ struct BamWrap {
return std::move(oss.str());
}
// 获取名字
//
inline const char *query_name() { return bam_get_qname(b); }
// 获取cigar 字符串
// cigar
inline string cigar_str() {
ostringstream oss;
const uint32_t *cigar = bam_get_cigar(b);
@ -111,10 +111,10 @@ struct BamWrap {
return std::move(oss.str());
}
// 占用的内存大小
//
inline int16_t length() { return sizeof(*this) + sizeof(bam1_t) + b->l_data; }
// 获取cigarinsert的总长度
// cigarinsert
inline int32_t insert_cigar_len() {
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
@ -128,7 +128,7 @@ struct BamWrap {
return ret;
}
// 获取cigardelete的总长度
// cigardelete
inline int32_t del_cigar_len() {
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
@ -142,7 +142,7 @@ struct BamWrap {
return ret;
}
// 计算sam read的终点位置
// sam read
static inline int64_t BamEndPos(const bam1_t *b) {
const uint32_t *cigar = bam_get_cigar(b);
const bam1_core_t &bc = b->core;
@ -170,7 +170,7 @@ struct BamWrap {
return hasWellDefinedFragmentSize;
}
// 计算bamadapterBoundary
// bamadapterBoundary
int GetAdapterBoundary() {
const bam1_core_t &bc = b->core;
int adapterBoundary;
@ -179,11 +179,11 @@ struct BamWrap {
else if (bc.flag & BAM_FREVERSE)
adapterBoundary = bc.mpos - 1;
else
adapterBoundary = bc.pos + abs(bc.isize); // GATK4.0 GATK3.5不一样3.5的这里+1
adapterBoundary = bc.pos + abs(bc.isize); // GATK4.0 GATK3.53.5+1
return adapterBoundary;
}
// 获取开头的I的长度
// I
inline int GetHeadInsertLen() {
int insLen = 0;
const uint32_t *cigar = bam_get_cigar(b);
@ -200,8 +200,8 @@ struct BamWrap {
return insLen;
}
// 获取soft clip开始位置(能处理HS相连的情况有这种情况么,
// 注意开头的I要当做S)
// soft clip(HS,
// IS)
inline int64_t GetSoftStart() {
int64_t softStart = b->core.pos;
const uint32_t *cigar = bam_get_cigar(b);
@ -217,7 +217,7 @@ struct BamWrap {
return softStart;
}
// 获取unclipped开始位置(包括hardclip)
// unclipped(hardclip)
inline int64_t GetUnclippedStart() {
int64_t start = b->core.pos;
const uint32_t *cigar = bam_get_cigar(b);
@ -233,7 +233,7 @@ struct BamWrap {
return start;
}
// 获取unclipped结束位置(包括hardclip)
// unclipped(hardclip)
inline int64_t GetUnclippedEnd() {
int64_t end_pos = bam_endpos(b);
const uint32_t *cigar = bam_get_cigar(b);
@ -249,7 +249,7 @@ struct BamWrap {
return end_pos - 1;
}
/* 获取碱基质量分数的加和 */
/* */
/** Calculates a score for the read which is the sum of scores over Q15. */
inline int GetSumOfBaseQualities() {
int score = 0;
@ -262,9 +262,9 @@ struct BamWrap {
return score;
}
/* flag相关的检测 */
/* flag */
/* 没有比对上 unmapped */
/* unmapped */
inline bool GetReadUnmappedFlag() { return b->core.flag & BAM_FUNMAP; }
/* Template having multiple segments in sequencing */
@ -313,7 +313,7 @@ struct BamWrap {
*/
bool GetMateNegativeStrandFlag() { return b->core.flag & BAM_FMREVERSE; }
/* 其他的一些信息 */
/* */
inline int GetReferenceLength() {
int length = 0;
const uint32_t *cigar = bam_get_cigar(b);
@ -336,26 +336,26 @@ struct BamWrap {
return length;
}
// 计算bam的全局位置算上染色体序号和比对位置
// bam
static inline int64_t bam_global_pos(bam1_t *b) {
return (((int64_t)b->core.tid << MAX_CONTIG_LEN_SHIFT) | (int64_t)b->core.pos);
}
static inline int64_t bam_global_pos(int tid, int pos) {
return (((int64_t)tid << MAX_CONTIG_LEN_SHIFT) | (int64_t)pos);
}
// 根据全局位置获取bam的染色体序号
// bam
static inline int32_t bam_tid(int64_t global_pos) {
const int64_t mask = ~(((int64_t)1 << MAX_CONTIG_LEN_SHIFT) - 1);
const int64_t high_tid = global_pos & mask;
return (int32_t)(high_tid >> MAX_CONTIG_LEN_SHIFT);
}
// 根据全局位置获取bam的比对位置(染色体内)
// bam()
static inline int32_t bam_pos(int64_t global_pos) {
const int64_t mask = ((int64_t)1 << MAX_CONTIG_LEN_SHIFT) - 1;
return (int32_t)(global_pos & mask);
}
// 设置是否冗余的标记
//
void SetDuplicateReadFlag(bool flag) { setFlag(flag, BAM_FDUP); }
void setFlag(bool flag, int bit) {

View File

@ -1,5 +1,5 @@
/*
Description: Murmur
Description: Murmur
Copyright : All right reserved by ICT