改用了三数据块处理方式,解决了之前的问题,代码还没整理完全,有注释和调试信息
This commit is contained in:
parent
670d777824
commit
fb8c7755f5
|
|
@ -18,7 +18,8 @@ Date : 2023/10/23
|
||||||
#include "fastdup_version.h"
|
#include "fastdup_version.h"
|
||||||
#include "md_args.h"
|
#include "md_args.h"
|
||||||
#include "md_funcs.h"
|
#include "md_funcs.h"
|
||||||
#include "pipeline_md.h"
|
// #include "pipeline_md.h"
|
||||||
|
#include "new_pipe.h"
|
||||||
#include "read_name_parser.h"
|
#include "read_name_parser.h"
|
||||||
#include "util/profiling.h"
|
#include "util/profiling.h"
|
||||||
|
|
||||||
|
|
@ -33,9 +34,9 @@ sam_hdr_t *gInBamHeader; // 输入的bam文件头信息
|
||||||
samFile *gOutBamFp; // 输出文件, sam或者bam格式
|
samFile *gOutBamFp; // 输出文件, sam或者bam格式
|
||||||
sam_hdr_t *gOutBamHeader; // 输出文件的header
|
sam_hdr_t *gOutBamHeader; // 输出文件的header
|
||||||
DuplicationMetrics gMetrics; // 统计信息
|
DuplicationMetrics gMetrics; // 统计信息
|
||||||
PipelineArg gPipe;
|
DupResult gDupRes;
|
||||||
|
PipelineArg gPipe(&gDupRes);
|
||||||
};
|
}; // namespace nsgv
|
||||||
|
|
||||||
// 字节缓冲区
|
// 字节缓冲区
|
||||||
struct ByteBuf {
|
struct ByteBuf {
|
||||||
|
|
@ -98,7 +99,8 @@ int MarkDuplicates() {
|
||||||
|
|
||||||
/* 冗余检查和标记 */
|
/* 冗余检查和标记 */
|
||||||
PROF_START(markdup_all);
|
PROF_START(markdup_all);
|
||||||
pipelineMarkDups();
|
//pipelineMarkDups();
|
||||||
|
NewPipeMarkDups();
|
||||||
PROF_END(gprof[GP_markdup_all], markdup_all);
|
PROF_END(gprof[GP_markdup_all], markdup_all);
|
||||||
|
|
||||||
/* 标记冗余, 将处理后的结果写入文件 */
|
/* 标记冗余, 将处理后的结果写入文件 */
|
||||||
|
|
@ -157,10 +159,15 @@ int MarkDuplicates() {
|
||||||
|
|
||||||
DupIdxQueue<DupInfo> dupIdxQue, repIdxQue;
|
DupIdxQueue<DupInfo> dupIdxQue, repIdxQue;
|
||||||
DupIdxQueue<int64_t> opticalIdxQue;
|
DupIdxQueue<int64_t> opticalIdxQue;
|
||||||
dupIdxQue.Init(&nsgv::gPipe.intersectData.dupIdxArr);
|
dupIdxQue.Init(&nsgv::gDupRes.dupIdxArr);
|
||||||
repIdxQue.Init(&nsgv::gPipe.intersectData.repIdxArr);
|
repIdxQue.Init(&nsgv::gDupRes.repIdxArr);
|
||||||
opticalIdxQue.Init(&nsgv::gPipe.intersectData.opticalDupIdxArr);
|
opticalIdxQue.Init(&nsgv::gDupRes.opticalDupIdxArr);
|
||||||
spdlog::info("{} duplicate reads found", dupIdxQue.Size());
|
spdlog::info("{} duplicate reads found", dupIdxQue.Size());
|
||||||
|
spdlog::info("{} optical reads found", opticalIdxQue.Size());
|
||||||
|
spdlog::info("{} represent reads found", repIdxQue.Size());
|
||||||
|
spdlog::info("real dup size: {}", dupIdxQue.RealSize());
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
|
||||||
uint64_t bamIdx = 0;
|
uint64_t bamIdx = 0;
|
||||||
DupInfo dupIdx = dupIdxQue.Pop();
|
DupInfo dupIdx = dupIdxQue.Pop();
|
||||||
|
|
@ -177,7 +184,12 @@ int MarkDuplicates() {
|
||||||
|
|
||||||
int64_t realDupSize = 0;
|
int64_t realDupSize = 0;
|
||||||
|
|
||||||
return 0;
|
ofstream ofs("newdup.txt");
|
||||||
|
|
||||||
|
// return 0;
|
||||||
|
// for debug
|
||||||
|
int64_t maxDiff = 0;
|
||||||
|
int64_t minDiff = 0;
|
||||||
|
|
||||||
PROF_START(write);
|
PROF_START(write);
|
||||||
while (inBuf.ReadStat() >= 0) {
|
while (inBuf.ReadStat() >= 0) {
|
||||||
|
|
@ -214,15 +226,25 @@ int MarkDuplicates() {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* 判断是否冗余 */
|
/* 判断是否冗余 */
|
||||||
|
// cout << dupIdx << endl;
|
||||||
if (bamIdx == dupIdx) {
|
if (bamIdx == dupIdx) {
|
||||||
|
// ofs << bamIdx << endl;
|
||||||
++realDupSize; // for test
|
++realDupSize; // for test
|
||||||
isDup = true;
|
isDup = true;
|
||||||
if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && dupIdx.dupSet != 0) {
|
if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && dupIdx.dupSet != 0) {
|
||||||
isInDuplicateSet = true;
|
isInDuplicateSet = true;
|
||||||
representativeReadIndexInFile = dupIdx.repIdx;
|
representativeReadIndexInFile = dupIdx.GetRepIdx();
|
||||||
duplicateSetSize = dupIdx.dupSet;
|
duplicateSetSize = dupIdx.dupSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 1
|
||||||
|
// spdlog::info("diff: {}", dupIdx.idx - dupIdx.repIdx);
|
||||||
|
//maxDiff = std::max(maxDiff, dupIdx.idx - dupIdx.repIdx);
|
||||||
|
//minDiff = std::min(minDiff, dupIdx.idx - dupIdx.repIdx);
|
||||||
|
//spdlog::info("min: {}, max: {}", minDiff, maxDiff);
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
// 为了防止小内存运行的时候,有重复的dupidx,这时候dup的repIdx和dupSetSize可能会有不同
|
// 为了防止小内存运行的时候,有重复的dupidx,这时候dup的repIdx和dupSetSize可能会有不同
|
||||||
while ((dupIdx = dupIdxQue.Pop()) == bamIdx);
|
while ((dupIdx = dupIdxQue.Pop()) == bamIdx);
|
||||||
if (opticalIdx == bamIdx)
|
if (opticalIdx == bamIdx)
|
||||||
|
|
@ -262,7 +284,7 @@ int MarkDuplicates() {
|
||||||
}
|
}
|
||||||
if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && bamIdx == repIdx) { // repressent
|
if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && bamIdx == repIdx) { // repressent
|
||||||
isInDuplicateSet = true;
|
isInDuplicateSet = true;
|
||||||
representativeReadIndexInFile = repIdx.repIdx;
|
representativeReadIndexInFile = repIdx.GetRepIdx();
|
||||||
duplicateSetSize = repIdx.dupSet;
|
duplicateSetSize = repIdx.dupSet;
|
||||||
while (repIdx == bamIdx || repIdx.dupSet == 0) {
|
while (repIdx == bamIdx || repIdx.dupSet == 0) {
|
||||||
if (repIdxQue.Size() > 0)
|
if (repIdxQue.Size() > 0)
|
||||||
|
|
@ -277,6 +299,7 @@ int MarkDuplicates() {
|
||||||
if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && isInDuplicateSet) {
|
if (nsgv::gMdArg.TAG_DUPLICATE_SET_MEMBERS && isInDuplicateSet) {
|
||||||
if (!bw->IsSecondaryOrSupplementary() && !bw->GetReadUnmappedFlag()) {
|
if (!bw->IsSecondaryOrSupplementary() && !bw->GetReadUnmappedFlag()) {
|
||||||
// cerr << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl;
|
// cerr << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl;
|
||||||
|
// ofs << bamIdx << " " << representativeReadIndexInFile << " " << duplicateSetSize << endl;
|
||||||
uint8_t *oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_SET_INDEX_TAG.c_str());
|
uint8_t *oldTagVal = bam_aux_get(bw->b, nsgv::gMdArg.DUPLICATE_SET_INDEX_TAG.c_str());
|
||||||
if (oldTagVal != NULL)
|
if (oldTagVal != NULL)
|
||||||
bam_aux_del(bw->b, oldTagVal);
|
bam_aux_del(bw->b, oldTagVal);
|
||||||
|
|
@ -304,7 +327,7 @@ int MarkDuplicates() {
|
||||||
bam_aux_append(bw->b, "PG", 'Z', nsgv::gMdArg.PROGRAM_RECORD_ID.size() + 1,
|
bam_aux_append(bw->b, "PG", 'Z', nsgv::gMdArg.PROGRAM_RECORD_ID.size() + 1,
|
||||||
(const uint8_t *)nsgv::gMdArg.PROGRAM_RECORD_ID.c_str());
|
(const uint8_t *)nsgv::gMdArg.PROGRAM_RECORD_ID.c_str());
|
||||||
}
|
}
|
||||||
#if 1
|
#if 0
|
||||||
if (sam_write1(nsgv::gOutBamFp, nsgv::gOutBamHeader, bw->b) < 0) {
|
if (sam_write1(nsgv::gOutBamFp, nsgv::gOutBamHeader, bw->b) < 0) {
|
||||||
spdlog::error("failed writing sam record to \"{}\"", nsgv::gMdArg.OUTPUT_FILE.c_str());
|
spdlog::error("failed writing sam record to \"{}\"", nsgv::gMdArg.OUTPUT_FILE.c_str());
|
||||||
sam_close(nsgv::gOutBamFp);
|
sam_close(nsgv::gOutBamFp);
|
||||||
|
|
@ -322,8 +345,7 @@ int MarkDuplicates() {
|
||||||
// 计算统计信息
|
// 计算统计信息
|
||||||
nsgv::gMetrics.READ_PAIRS_EXAMINED /= 2;
|
nsgv::gMetrics.READ_PAIRS_EXAMINED /= 2;
|
||||||
nsgv::gMetrics.READ_PAIR_DUPLICATES /= 2;
|
nsgv::gMetrics.READ_PAIR_DUPLICATES /= 2;
|
||||||
for (auto &arr : nsgv::gPipe.intersectData.opticalDupIdxArr)
|
for (auto &arr : nsgv::gDupRes.opticalDupIdxArr) nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES += arr.size();
|
||||||
nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES += arr.size();
|
|
||||||
nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES = nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES / 2;
|
nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES = nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES / 2;
|
||||||
nsgv::gMetrics.ESTIMATED_LIBRARY_SIZE =
|
nsgv::gMetrics.ESTIMATED_LIBRARY_SIZE =
|
||||||
estimateLibrarySize(nsgv::gMetrics.READ_PAIRS_EXAMINED - nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES,
|
estimateLibrarySize(nsgv::gMetrics.READ_PAIRS_EXAMINED - nsgv::gMetrics.READ_PAIR_OPTICAL_DUPLICATES,
|
||||||
|
|
@ -376,5 +398,7 @@ int MarkDuplicates() {
|
||||||
sam_close(nsgv::gOutBamFp);
|
sam_close(nsgv::gOutBamFp);
|
||||||
sam_close(nsgv::gInBamFp);
|
sam_close(nsgv::gInBamFp);
|
||||||
|
|
||||||
|
ofs.close();
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
@ -64,7 +64,7 @@ struct MarkDupsArg {
|
||||||
|
|
||||||
int NUM_THREADS = 1;
|
int NUM_THREADS = 1;
|
||||||
|
|
||||||
size_t MAX_MEM = ((size_t)1) << 31; // 最小2G
|
size_t MAX_MEM = ((size_t)1) << 30; // << 31 // 最小2G
|
||||||
|
|
||||||
bool DUPLEX_IO = true; // 同时读写
|
bool DUPLEX_IO = true; // 同时读写
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -49,13 +49,22 @@ struct CalcKeyHash {
|
||||||
|
|
||||||
/* 用来记录冗余索引相关的信息 */
|
/* 用来记录冗余索引相关的信息 */
|
||||||
struct DupInfo {
|
struct DupInfo {
|
||||||
int64_t idx;
|
|
||||||
int64_t repIdx = 0; // 这一批冗余中的非冗余read 代表的索引
|
|
||||||
int16_t dupSet = 0; // dup set size
|
int16_t dupSet = 0; // dup set size
|
||||||
|
uint16_t repIdxHigh = 0; // 这一批冗余中的非冗余read 代表的索引
|
||||||
|
uint32_t repIdxLow = 0;
|
||||||
|
int64_t idx;
|
||||||
|
|
||||||
DupInfo() : DupInfo(-1, 0, 0) {}
|
DupInfo() : DupInfo(-1, 0, 0) {}
|
||||||
DupInfo(int64_t idx_) : DupInfo(idx_, 0, 0) {}
|
DupInfo(int64_t idx_) : DupInfo(idx_, 0, 0) {}
|
||||||
DupInfo(int64_t idx_, int64_t repIdx_, int dupSet_) : idx(idx_), repIdx(repIdx_), dupSet(dupSet_) {}
|
DupInfo(int64_t idx_, int64_t repIdx_, int dupSet_) : idx(idx_), dupSet(dupSet_) {
|
||||||
|
repIdxHigh = repIdx_ >> 32;
|
||||||
|
repIdxLow = (uint32_t)repIdx_;
|
||||||
|
}
|
||||||
|
int64_t GetRepIdx() {
|
||||||
|
int64_t repIdx = repIdxHigh;
|
||||||
|
repIdx = (repIdx << 32) | repIdxLow;
|
||||||
|
return repIdx;
|
||||||
|
}
|
||||||
bool operator<(const DupInfo &o) const { return idx < o.idx; }
|
bool operator<(const DupInfo &o) const { return idx < o.idx; }
|
||||||
bool operator>(const DupInfo &o) const { return idx > o.idx; }
|
bool operator>(const DupInfo &o) const { return idx > o.idx; }
|
||||||
operator int64_t() const { return idx; }
|
operator int64_t() const { return idx; }
|
||||||
|
|
@ -110,8 +119,8 @@ struct UnpairedPosInfo {
|
||||||
// typedef unordered_map<int64_t, UnpairedPosInfo> UnpairedPositionMap;
|
// typedef unordered_map<int64_t, UnpairedPosInfo> UnpairedPositionMap;
|
||||||
|
|
||||||
typedef tsl::robin_map<string, UnpairedREInfo> UnpairedNameMap; // 以read name为索引,保存未匹配的pair read
|
typedef tsl::robin_map<string, UnpairedREInfo> UnpairedNameMap; // 以read name为索引,保存未匹配的pair read
|
||||||
typedef tsl::robin_map<int64_t, UnpairedPosInfo>
|
typedef tsl::robin_map<int64_t, UnpairedPosInfo> UnpairedPositionMap; // 以位点为索引,保存该位点包含的对应的所有read和该位点包含的剩余未匹配的read的数量
|
||||||
UnpairedPositionMap; // 以位点为索引,保存该位点包含的对应的所有read和该位点包含的剩余未匹配的read的数量
|
typedef map<CalcKey, vector<ReadEnds>> CkeyReadEndsMap; // 以calckey为关键字,保存在相邻数据块之前找到的匹配readEnds
|
||||||
|
|
||||||
/* 单线程处理冗余参数结构体 */
|
/* 单线程处理冗余参数结构体 */
|
||||||
struct MarkDupDataArg {
|
struct MarkDupDataArg {
|
||||||
|
|
@ -191,4 +200,36 @@ struct DupIdxQueue {
|
||||||
}
|
}
|
||||||
return len - popNum;
|
return len - popNum;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
uint64_t RealSize() {
|
||||||
|
uint64_t len = 0;
|
||||||
|
auto preTop = minHeap.top();
|
||||||
|
DupInfo dupIdx = this->Pop();
|
||||||
|
DupInfo nextDup = dupIdx;
|
||||||
|
auto topIdx = minHeap.top();
|
||||||
|
|
||||||
|
ofstream ofs("dupn-noxyz.txt");
|
||||||
|
|
||||||
|
while (dupIdx != -1) {
|
||||||
|
++len;
|
||||||
|
while (true) {
|
||||||
|
topIdx = minHeap.top();
|
||||||
|
nextDup = this->Pop();
|
||||||
|
if (nextDup != dupIdx) {
|
||||||
|
dupIdx = nextDup;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
cout << "the same dup: " << dupIdx << '\t' << preTop.arrId << '\t' << preTop.arrIdx << '\t'
|
||||||
|
<< preTop.dupIdx << '\t' << topIdx.arrId << '\t' << topIdx.arrIdx << '\t' << topIdx.dupIdx
|
||||||
|
<< endl;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//ofs << topIdx.arrId << '\t' << topIdx.arrIdx << '\t' << topIdx.dupIdx << endl;
|
||||||
|
ofs << topIdx.dupIdx << endl;
|
||||||
|
dupIdx = nextDup;
|
||||||
|
preTop = topIdx;
|
||||||
|
}
|
||||||
|
ofs.close();
|
||||||
|
return len;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
File diff suppressed because it is too large
Load Diff
|
|
@ -0,0 +1,264 @@
|
||||||
|
#pragma once
|
||||||
|
|
||||||
|
#include <inttypes.h>
|
||||||
|
#include <spdlog/spdlog.h>
|
||||||
|
#include <util/yarn.h>
|
||||||
|
|
||||||
|
#include "md_types.h"
|
||||||
|
|
||||||
|
struct ReadData {
|
||||||
|
vector<BamWrap *> bams; // read step output
|
||||||
|
int64_t bamStartIdx = 0; // 每轮读入的bam数组,起始位置在全局bam中的索引
|
||||||
|
int64_t taskSeq = 0; // 任务序号
|
||||||
|
};
|
||||||
|
|
||||||
|
struct GenREData {
|
||||||
|
vector<vector<ReadEnds>> pairsArr; // 成对的reads
|
||||||
|
vector<vector<ReadEnds>> fragsArr; // 暂未找到配对的reads
|
||||||
|
vector<UnpairedNameMap> unpairedDicArr; // 用来寻找pair end
|
||||||
|
void Init(int nThread) {
|
||||||
|
for (int i = 0; i <= nThread; ++i) { // 比线程多一个,主要是pairs多一个,其他没用
|
||||||
|
pairsArr.push_back(vector<ReadEnds>());
|
||||||
|
fragsArr.push_back(vector<ReadEnds>());
|
||||||
|
unpairedDicArr.push_back(UnpairedNameMap());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
UnpairedNameMap unpairedDic; // 代替sort step中一部分计算
|
||||||
|
size_t byteSize() {
|
||||||
|
size_t bytes = 0;
|
||||||
|
for (auto &v : pairsArr)
|
||||||
|
for (auto &r : v) bytes += sizeof(r);
|
||||||
|
for (auto &v : fragsArr)
|
||||||
|
for (auto &r : v) bytes += sizeof(r);
|
||||||
|
for (auto &m : unpairedDicArr) bytes += m.size() * 100;
|
||||||
|
bytes += unpairedDic.size() * 100;
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct SortMarkData {
|
||||||
|
vector<ReadEnds> pairs; // 成对的reads
|
||||||
|
vector<ReadEnds> frags; // 暂未找到配对的reads
|
||||||
|
UnpairedNameMap unpairedDic; // 用来寻找pair end
|
||||||
|
size_t byteSize() {
|
||||||
|
size_t bytes = 0;
|
||||||
|
for (auto &r : pairs) bytes += sizeof(r);
|
||||||
|
for (auto &r : frags) bytes += sizeof(r);
|
||||||
|
bytes += unpairedDic.size() * 100;
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct SortData {
|
||||||
|
volatile void *dataPtr; // SortMarkData pointer
|
||||||
|
};
|
||||||
|
|
||||||
|
struct MarkDupData {
|
||||||
|
int64_t taskSeq = 0; // 任务序号
|
||||||
|
DPSet<DupInfo> pairDupIdx; // pair的冗余read的索引
|
||||||
|
MDSet<int64_t> pairOpticalDupIdx; // optical冗余read的索引
|
||||||
|
DPSet<DupInfo> fragDupIdx; // frag的冗余read的索引
|
||||||
|
DPSet<DupInfo> pairRepIdx; // pair的dupset代表read的索引
|
||||||
|
CkeyReadEndsMap ckeyReadEndsMap;
|
||||||
|
|
||||||
|
volatile void *dataPtr; // SortMarkData pointer
|
||||||
|
|
||||||
|
void clear() {
|
||||||
|
fragDupIdx.clear();
|
||||||
|
pairDupIdx.clear();
|
||||||
|
pairOpticalDupIdx.clear();
|
||||||
|
pairRepIdx.clear();
|
||||||
|
ckeyReadEndsMap.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t byteSize() {
|
||||||
|
size_t bytes = 0;
|
||||||
|
bytes += pairDupIdx.size() * 100;
|
||||||
|
bytes += pairOpticalDupIdx.size() * 100;
|
||||||
|
bytes += fragDupIdx.size() * 100;
|
||||||
|
bytes += pairRepIdx.size() * 100;
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct DupResult {
|
||||||
|
vector<vector<DupInfo>> dupIdxArr;
|
||||||
|
vector<vector<int64_t>> opticalDupIdxArr;
|
||||||
|
vector<vector<DupInfo>> repIdxArr;
|
||||||
|
size_t byteSize() {
|
||||||
|
size_t bytes = 0;
|
||||||
|
size_t tmp = 0;
|
||||||
|
for (auto &v : dupIdxArr)
|
||||||
|
for (auto &r : v) tmp += sizeof(r);
|
||||||
|
spdlog::info("dupIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024);
|
||||||
|
bytes += tmp;
|
||||||
|
tmp = 0;
|
||||||
|
for (auto &v : opticalDupIdxArr)
|
||||||
|
for (auto &r : v) tmp += sizeof(r);
|
||||||
|
spdlog::info("opticalDupIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024);
|
||||||
|
bytes += tmp;
|
||||||
|
tmp = 0;
|
||||||
|
for (auto &v : repIdxArr)
|
||||||
|
for (auto &r : v) tmp += sizeof(r);
|
||||||
|
spdlog::info("repIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024);
|
||||||
|
bytes += tmp;
|
||||||
|
spdlog::info("result size : {} GB", bytes / 1024.0 / 1024 / 1024);
|
||||||
|
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct IntersectData {
|
||||||
|
UnpairedNameMap unpairedDic; // 用来寻找pair end
|
||||||
|
CkeyReadEndsMap ckeyReadEndsMap;
|
||||||
|
|
||||||
|
// 每个task对应一个vector
|
||||||
|
vector<vector<DupInfo>> &dupIdxArr;
|
||||||
|
vector<vector<int64_t>> &opticalDupIdxArr;
|
||||||
|
vector<vector<DupInfo>> &repIdxArr;
|
||||||
|
|
||||||
|
IntersectData(DupResult *resPtr)
|
||||||
|
: dupIdxArr(resPtr->dupIdxArr), opticalDupIdxArr(resPtr->opticalDupIdxArr), repIdxArr(resPtr->repIdxArr) {}
|
||||||
|
|
||||||
|
size_t byteSize() {
|
||||||
|
size_t bytes = 0;
|
||||||
|
bytes += unpairedDic.size() * 100;
|
||||||
|
for (auto &v : dupIdxArr)
|
||||||
|
for (auto &r : v) bytes += sizeof(r);
|
||||||
|
for (auto &v : opticalDupIdxArr)
|
||||||
|
for (auto &r : v) bytes += sizeof(r);
|
||||||
|
for (auto &v : repIdxArr)
|
||||||
|
for (auto &r : v) bytes += sizeof(r);
|
||||||
|
spdlog::info("result size : {}", bytes);
|
||||||
|
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// 记录流水线状态,task的序号,以及某阶段是否结束
|
||||||
|
struct PipelineArg {
|
||||||
|
static const int GENBUFNUM = 2;
|
||||||
|
static const int SORTBUFNUM = 2;
|
||||||
|
static const int MARKBUFNUM = 4;
|
||||||
|
uint64_t readOrder = 0;
|
||||||
|
uint64_t genREOrder = 0;
|
||||||
|
uint64_t sortOrder = 0;
|
||||||
|
uint64_t markDupOrder = 0;
|
||||||
|
uint64_t intersectOrder = 0;
|
||||||
|
int numThread = 0;
|
||||||
|
|
||||||
|
volatile int readFinish = 0;
|
||||||
|
volatile int genREFinish = 0;
|
||||||
|
volatile int sortFinish = 0;
|
||||||
|
volatile int markDupFinish = 0;
|
||||||
|
|
||||||
|
yarn::lock_t *readSig;
|
||||||
|
yarn::lock_t *genRESig;
|
||||||
|
yarn::lock_t *sortSig;
|
||||||
|
yarn::lock_t *markDupSig;
|
||||||
|
|
||||||
|
PipelineArg(DupResult *resPtr) : intersectData(resPtr) {
|
||||||
|
readSig = yarn::NEW_LOCK(0); // 最大值1, 双buffer在bambuf中实现了,对调用透明
|
||||||
|
genRESig = yarn::NEW_LOCK(0); // 最大值2, 双buffer
|
||||||
|
sortSig = yarn::NEW_LOCK(0);
|
||||||
|
markDupSig = yarn::NEW_LOCK(0);
|
||||||
|
for (int i = 0; i < SORTBUFNUM; ++i) {
|
||||||
|
sortData[i].dataPtr = &sortMarkData[i];
|
||||||
|
}
|
||||||
|
for (int i = 0; i < MARKBUFNUM; ++i) {
|
||||||
|
markDupData[i].dataPtr = &sortMarkData[i + SORTBUFNUM];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SortMarkData sortMarkData[SORTBUFNUM + MARKBUFNUM];
|
||||||
|
|
||||||
|
// for step-1 read
|
||||||
|
ReadData readData;
|
||||||
|
// for step-2 generate readends
|
||||||
|
GenREData genREData[GENBUFNUM];
|
||||||
|
// for step-3 sort each thread frags and pairs
|
||||||
|
SortData sortData[SORTBUFNUM];
|
||||||
|
// for step-4 mark duplicate
|
||||||
|
MarkDupData markDupData[MARKBUFNUM];
|
||||||
|
// for step-5 deal with intersect data
|
||||||
|
IntersectData intersectData;
|
||||||
|
|
||||||
|
size_t byteSize() {
|
||||||
|
size_t bytes = 0;
|
||||||
|
|
||||||
|
size_t tmp = 0;
|
||||||
|
for (int i = 0; i < SORTBUFNUM + MARKBUFNUM; ++i) tmp += sortMarkData[i].byteSize();
|
||||||
|
bytes += tmp;
|
||||||
|
spdlog::info("sortMarkData size : {}", tmp);
|
||||||
|
for (int i = 0; i < GENBUFNUM; ++i) tmp += genREData[i].byteSize();
|
||||||
|
bytes += tmp;
|
||||||
|
spdlog::info("genREData size : {}", tmp);
|
||||||
|
for (int i = 0; i < MARKBUFNUM; ++i) tmp += markDupData[i].byteSize();
|
||||||
|
bytes += tmp;
|
||||||
|
spdlog::info("markDupData size : {}", tmp);
|
||||||
|
tmp += intersectData.byteSize();
|
||||||
|
bytes += tmp;
|
||||||
|
spdlog::info("intersectData size : {}", tmp);
|
||||||
|
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct REArrIdIdx {
|
||||||
|
int arrId = 0; // 数组索引
|
||||||
|
uint64_t arrIdx = 0; // 数组内部当前索引
|
||||||
|
const ReadEnds *pE = nullptr;
|
||||||
|
};
|
||||||
|
|
||||||
|
struct REGreaterThan {
|
||||||
|
bool operator()(const REArrIdIdx &a, const REArrIdIdx &b) { return *b.pE < *a.pE; }
|
||||||
|
};
|
||||||
|
|
||||||
|
struct ReadEndsHeap {
|
||||||
|
// 将冗余索引和他对应的task vector对应起来
|
||||||
|
vector<vector<ReadEnds>> *arr2d;
|
||||||
|
priority_queue<REArrIdIdx, vector<REArrIdIdx>, REGreaterThan> minHeap;
|
||||||
|
uint64_t popNum = 0;
|
||||||
|
|
||||||
|
int Init(vector<vector<ReadEnds>> *_arr2d) {
|
||||||
|
arr2d = _arr2d;
|
||||||
|
if (arr2d == nullptr) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < arr2d->size(); ++i) {
|
||||||
|
auto &v = (*arr2d)[i];
|
||||||
|
if (!v.empty()) {
|
||||||
|
minHeap.push({i, 1, &v[0]});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
const ReadEnds *Pop() {
|
||||||
|
const ReadEnds *ret = nullptr;
|
||||||
|
if (!minHeap.empty()) {
|
||||||
|
auto minVal = minHeap.top();
|
||||||
|
minHeap.pop();
|
||||||
|
++popNum;
|
||||||
|
ret = minVal.pE;
|
||||||
|
auto &v = (*arr2d)[minVal.arrId];
|
||||||
|
if (v.size() > minVal.arrIdx) {
|
||||||
|
minHeap.push({minVal.arrId, minVal.arrIdx + 1, &v[minVal.arrIdx]});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
uint64_t Size() {
|
||||||
|
uint64_t len = 0;
|
||||||
|
if (arr2d != nullptr) {
|
||||||
|
for (auto &v : *arr2d) {
|
||||||
|
len += v.size();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return len - popNum;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// 并行运行mark duplicate
|
||||||
|
void NewPipeMarkDups();
|
||||||
|
|
@ -26,8 +26,8 @@ extern samFile *gInBamFp; // 输入的bam文件
|
||||||
extern sam_hdr_t *gInBamHeader; // 输入的bam文件头信息
|
extern sam_hdr_t *gInBamHeader; // 输入的bam文件头信息
|
||||||
extern DuplicationMetrics gMetrics; // 统计信息
|
extern DuplicationMetrics gMetrics; // 统计信息
|
||||||
extern vector<ReadNameParser> gNameParsers;
|
extern vector<ReadNameParser> gNameParsers;
|
||||||
|
extern DupResult gDupRes;
|
||||||
extern PipelineArg gPipe;
|
extern PipelineArg gPipe;
|
||||||
|
|
||||||
}; // namespace nsgv
|
}; // namespace nsgv
|
||||||
|
|
||||||
/* 排序 */
|
/* 排序 */
|
||||||
|
|
@ -304,9 +304,13 @@ static void refeshTaskDupInfo(DPSet<DupInfo> &dupIdx, MDSet<int64_t> &opticalDup
|
||||||
/* 最后合并数据并排序 */
|
/* 最后合并数据并排序 */
|
||||||
template <typename DupContainer, typename T>
|
template <typename DupContainer, typename T>
|
||||||
static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet<int64_t> ¬DupIdx, vector<T> &dupArr,
|
static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet<int64_t> ¬DupIdx, vector<T> &dupArr,
|
||||||
vector<T> &cacheDupIdx, vector<T> &midArr) {
|
vector<T> &cacheDupIdx1, vector<T> &midArr1) {
|
||||||
midArr.resize(0);
|
//midArr.resize(0);
|
||||||
cacheDupIdx.resize(0);
|
//cacheDupIdx.resize(0);
|
||||||
|
|
||||||
|
vector<T> cacheDupIdx;
|
||||||
|
vector<T> midArr;
|
||||||
|
|
||||||
cacheDupIdx.insert(cacheDupIdx.end(), dupIdx.begin(), dupIdx.end());
|
cacheDupIdx.insert(cacheDupIdx.end(), dupIdx.begin(), dupIdx.end());
|
||||||
std::sort(cacheDupIdx.begin(), cacheDupIdx.end());
|
std::sort(cacheDupIdx.begin(), cacheDupIdx.end());
|
||||||
|
|
||||||
|
|
@ -341,7 +345,11 @@ static void refeshFinalTaskDupInfo(DupContainer &dupIdx, MDSet<int64_t> ¬DupI
|
||||||
midArr.push_back(val);
|
midArr.push_back(val);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
dupArr = midArr;
|
// spdlog::info("midArr & dupArr size: {}-{}", midArr.size(), dupArr.size());
|
||||||
|
//dupArr = midArr;
|
||||||
|
dupArr.clear();
|
||||||
|
dupArr.assign(midArr.begin(), midArr.end());
|
||||||
|
spdlog::info("midArr & dupArr size: {}-{}", midArr.size(), dupArr.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
// for step 2 generate read ends
|
// for step 2 generate read ends
|
||||||
|
|
@ -1003,7 +1011,7 @@ static void mergeAllTask(PipelineArg &pipeArg) {
|
||||||
g.unpairedPosArr.clear();
|
g.unpairedPosArr.clear();
|
||||||
PROF_END(gprof[GP_merge_markdup], merge_markdup);
|
PROF_END(gprof[GP_merge_markdup], merge_markdup);
|
||||||
|
|
||||||
// 将dupidx放进全局数据
|
// // 将dupidx放进全局数据
|
||||||
PROF_START(merge_update);
|
PROF_START(merge_update);
|
||||||
vector<DupInfo> cacheDupIdx;
|
vector<DupInfo> cacheDupIdx;
|
||||||
vector<DupInfo> midArr;
|
vector<DupInfo> midArr;
|
||||||
|
|
@ -1039,7 +1047,8 @@ static void mergeAllTask(PipelineArg &pipeArg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static void parallelPipeline() {
|
static void parallelPipeline() {
|
||||||
PipelineArg &pipeArg = nsgv::gPipe;
|
PipelineArg pipeArg(&nsgv::gDupRes);
|
||||||
|
//PipelineArg &pipeArg = nsgv::gPipe;
|
||||||
pipeArg.numThread = nsgv::gMdArg.NUM_THREADS;
|
pipeArg.numThread = nsgv::gMdArg.NUM_THREADS;
|
||||||
|
|
||||||
pthread_t tidArr[5];
|
pthread_t tidArr[5];
|
||||||
|
|
@ -1052,6 +1061,14 @@ static void parallelPipeline() {
|
||||||
PROF_START(merge_result);
|
PROF_START(merge_result);
|
||||||
mergeAllTask(pipeArg);
|
mergeAllTask(pipeArg);
|
||||||
PROF_END(gprof[GP_merge_result], merge_result);
|
PROF_END(gprof[GP_merge_result], merge_result);
|
||||||
|
|
||||||
|
spdlog::info("pipeArg size : {}", pipeArg.byteSize());
|
||||||
|
|
||||||
|
size_t repNum = 0;
|
||||||
|
for (auto &v : pipeArg.intersectData.repIdxArr) repNum += v.size();
|
||||||
|
spdlog::info("rep num : {}", repNum);
|
||||||
|
|
||||||
|
spdlog::info("result size : {}", nsgv::gDupRes.byteSize());
|
||||||
}
|
}
|
||||||
|
|
||||||
/* 并行流水线方式处理数据,标记冗余 */
|
/* 并行流水线方式处理数据,标记冗余 */
|
||||||
|
|
@ -1059,7 +1076,7 @@ void pipelineMarkDups() {
|
||||||
if (nsgv::gMdArg.NUM_THREADS > 1)
|
if (nsgv::gMdArg.NUM_THREADS > 1)
|
||||||
return parallelPipeline();
|
return parallelPipeline();
|
||||||
|
|
||||||
PipelineArg &pipeArg = nsgv::gPipe;
|
PipelineArg pipeArg(&nsgv::gDupRes);
|
||||||
pipeArg.numThread = nsgv::gMdArg.NUM_THREADS;
|
pipeArg.numThread = nsgv::gMdArg.NUM_THREADS;
|
||||||
BamBufType inBamBuf(nsgv::gMdArg.DUPLEX_IO);
|
BamBufType inBamBuf(nsgv::gMdArg.DUPLEX_IO);
|
||||||
inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gMdArg.MAX_MEM);
|
inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gMdArg.MAX_MEM);
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,7 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include <inttypes.h>
|
#include <inttypes.h>
|
||||||
|
#include <spdlog/spdlog.h>
|
||||||
#include <util/yarn.h>
|
#include <util/yarn.h>
|
||||||
|
|
||||||
#include "md_types.h"
|
#include "md_types.h"
|
||||||
|
|
@ -24,6 +25,15 @@ struct GenREData {
|
||||||
}
|
}
|
||||||
UnpairedNameMap unpairedDic; // 代替sort step中一部分计算
|
UnpairedNameMap unpairedDic; // 代替sort step中一部分计算
|
||||||
UnpairedPositionMap unpairedPosArr; //
|
UnpairedPositionMap unpairedPosArr; //
|
||||||
|
size_t byteSize() {
|
||||||
|
size_t bytes = 0;
|
||||||
|
for (auto &v : pairsArr) for (auto &r : v) bytes += sizeof(r);
|
||||||
|
for (auto &v : fragsArr) for (auto &r : v) bytes += sizeof(r);
|
||||||
|
for (auto &m : unpairedDicArr) bytes += m.size() * 100;
|
||||||
|
bytes += unpairedDic.size() * 100;
|
||||||
|
bytes += unpairedPosArr.size() * 1000;
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SortMarkData {
|
struct SortMarkData {
|
||||||
|
|
@ -31,6 +41,14 @@ struct SortMarkData {
|
||||||
vector<ReadEnds> frags; // 暂未找到配对的reads
|
vector<ReadEnds> frags; // 暂未找到配对的reads
|
||||||
UnpairedNameMap unpairedDic; // 用来寻找pair end
|
UnpairedNameMap unpairedDic; // 用来寻找pair end
|
||||||
UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd,为了避免重复存储
|
UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd,为了避免重复存储
|
||||||
|
size_t byteSize() {
|
||||||
|
size_t bytes = 0;
|
||||||
|
for (auto &r : pairs) bytes += sizeof(r);
|
||||||
|
for (auto &r : frags) bytes += sizeof(r);
|
||||||
|
bytes += unpairedDic.size() * 100;
|
||||||
|
bytes += unpairedPosArr.size() * 1000;
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct SortData {
|
struct SortData {
|
||||||
|
|
@ -45,6 +63,40 @@ struct MarkDupData {
|
||||||
DPSet<DupInfo> pairRepIdx; // pair的dupset代表read的索引
|
DPSet<DupInfo> pairRepIdx; // pair的dupset代表read的索引
|
||||||
|
|
||||||
volatile void *dataPtr; // SortMarkData pointer
|
volatile void *dataPtr; // SortMarkData pointer
|
||||||
|
|
||||||
|
size_t byteSize() {
|
||||||
|
size_t bytes = 0;
|
||||||
|
bytes += pairDupIdx.size() * 100;
|
||||||
|
bytes += pairOpticalDupIdx.size() * 100;
|
||||||
|
bytes += fragDupIdx.size() * 100;
|
||||||
|
bytes += pairRepIdx.size() * 100;
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
struct DupResult {
|
||||||
|
vector<vector<DupInfo>> dupIdxArr;
|
||||||
|
vector<vector<int64_t>> opticalDupIdxArr;
|
||||||
|
vector<vector<DupInfo>> repIdxArr;
|
||||||
|
size_t byteSize() {
|
||||||
|
size_t bytes = 0;
|
||||||
|
size_t tmp = 0;
|
||||||
|
for (auto &v : dupIdxArr) for (auto &r : v) tmp += sizeof(r);
|
||||||
|
spdlog::info("dupIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024);
|
||||||
|
bytes += tmp;
|
||||||
|
tmp = 0;
|
||||||
|
for (auto &v : opticalDupIdxArr)
|
||||||
|
for (auto &r : v) tmp += sizeof(r);
|
||||||
|
spdlog::info("opticalDupIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024);
|
||||||
|
bytes += tmp;
|
||||||
|
tmp = 0;
|
||||||
|
for (auto &v : repIdxArr) for (auto &r : v) tmp += sizeof(r);
|
||||||
|
spdlog::info("repIdxArr size : {} GB", tmp / 1024.0 / 1024 / 1024);
|
||||||
|
bytes += tmp;
|
||||||
|
spdlog::info("result size : {} GB", bytes / 1024.0 / 1024 / 1024);
|
||||||
|
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct IntersectData {
|
struct IntersectData {
|
||||||
|
|
@ -52,9 +104,9 @@ struct IntersectData {
|
||||||
UnpairedPositionMap unpairedPosArr;
|
UnpairedPositionMap unpairedPosArr;
|
||||||
|
|
||||||
// 每个task对应一个vector
|
// 每个task对应一个vector
|
||||||
vector<vector<DupInfo>> dupIdxArr;
|
vector<vector<DupInfo>> &dupIdxArr;
|
||||||
vector<vector<int64_t>> opticalDupIdxArr;
|
vector<vector<int64_t>> &opticalDupIdxArr;
|
||||||
vector<vector<DupInfo>> repIdxArr;
|
vector<vector<DupInfo>> &repIdxArr;
|
||||||
|
|
||||||
// 用来存放后续计算的数据
|
// 用来存放后续计算的数据
|
||||||
vector<DPSet<DupInfo>> latterDupIdxArr;
|
vector<DPSet<DupInfo>> latterDupIdxArr;
|
||||||
|
|
@ -63,6 +115,31 @@ struct IntersectData {
|
||||||
vector<MDSet<int64_t>> latterNotDupIdxArr;
|
vector<MDSet<int64_t>> latterNotDupIdxArr;
|
||||||
vector<MDSet<int64_t>> latterNotOpticalDupIdxArr;
|
vector<MDSet<int64_t>> latterNotOpticalDupIdxArr;
|
||||||
vector<MDSet<int64_t>> latterNotRepIdxArr;
|
vector<MDSet<int64_t>> latterNotRepIdxArr;
|
||||||
|
|
||||||
|
IntersectData(DupResult *resPtr) :
|
||||||
|
dupIdxArr(resPtr->dupIdxArr),
|
||||||
|
opticalDupIdxArr(resPtr->opticalDupIdxArr),
|
||||||
|
repIdxArr(resPtr->repIdxArr)
|
||||||
|
{}
|
||||||
|
|
||||||
|
size_t byteSize() {
|
||||||
|
size_t bytes = 0;
|
||||||
|
bytes += unpairedDic.size() * 100;
|
||||||
|
bytes += unpairedPosArr.size() * 1000;
|
||||||
|
for (auto &v : dupIdxArr) for (auto &r : v) bytes += sizeof(r);
|
||||||
|
for (auto &v : opticalDupIdxArr) for (auto &r : v) bytes += sizeof(r);
|
||||||
|
for (auto &v : repIdxArr) for (auto &r : v) bytes += sizeof(r);
|
||||||
|
spdlog::info("result size : {}", bytes);
|
||||||
|
|
||||||
|
for (auto &s : latterDupIdxArr) bytes += s.size() * sizeof(DupInfo);
|
||||||
|
for (auto &s : latterOpticalDupIdxArr) bytes += s.size() * sizeof(DupInfo);
|
||||||
|
for (auto &s : latterRepIdxArr) bytes += s.size() * sizeof(DupInfo);
|
||||||
|
for (auto &s : latterNotDupIdxArr) bytes += s.size() * sizeof(DupInfo);
|
||||||
|
for (auto &s : latterNotOpticalDupIdxArr) bytes += s.size() * sizeof(DupInfo);
|
||||||
|
for (auto &s : latterNotRepIdxArr) bytes += s.size() * sizeof(DupInfo);
|
||||||
|
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// 记录流水线状态,task的序号,以及某阶段是否结束
|
// 记录流水线状态,task的序号,以及某阶段是否结束
|
||||||
|
|
@ -87,7 +164,7 @@ struct PipelineArg {
|
||||||
yarn::lock_t *sortSig;
|
yarn::lock_t *sortSig;
|
||||||
yarn::lock_t *markDupSig;
|
yarn::lock_t *markDupSig;
|
||||||
|
|
||||||
PipelineArg() {
|
PipelineArg(DupResult *resPtr) : intersectData(resPtr) {
|
||||||
readSig = yarn::NEW_LOCK(0); // 最大值1, 双buffer在bambuf中实现了,对调用透明
|
readSig = yarn::NEW_LOCK(0); // 最大值1, 双buffer在bambuf中实现了,对调用透明
|
||||||
genRESig = yarn::NEW_LOCK(0); // 最大值2, 双buffer
|
genRESig = yarn::NEW_LOCK(0); // 最大值2, 双buffer
|
||||||
sortSig = yarn::NEW_LOCK(0);
|
sortSig = yarn::NEW_LOCK(0);
|
||||||
|
|
@ -112,6 +189,26 @@ struct PipelineArg {
|
||||||
MarkDupData markDupData[MARKBUFNUM];
|
MarkDupData markDupData[MARKBUFNUM];
|
||||||
// for step-5 deal with intersect data
|
// for step-5 deal with intersect data
|
||||||
IntersectData intersectData;
|
IntersectData intersectData;
|
||||||
|
|
||||||
|
size_t byteSize() {
|
||||||
|
size_t bytes = 0;
|
||||||
|
|
||||||
|
size_t tmp = 0;
|
||||||
|
for (int i = 0; i < SORTBUFNUM + MARKBUFNUM; ++i) tmp += sortMarkData[i].byteSize();
|
||||||
|
bytes += tmp;
|
||||||
|
spdlog::info("sortMarkData size : {}", tmp);
|
||||||
|
for (int i = 0; i < GENBUFNUM; ++i) tmp += genREData[i].byteSize();
|
||||||
|
bytes += tmp;
|
||||||
|
spdlog::info("genREData size : {}", tmp);
|
||||||
|
for (int i = 0; i < MARKBUFNUM; ++i) tmp += markDupData[i].byteSize();
|
||||||
|
bytes += tmp;
|
||||||
|
spdlog::info("markDupData size : {}", tmp);
|
||||||
|
tmp += intersectData.byteSize();
|
||||||
|
bytes += tmp;
|
||||||
|
spdlog::info("intersectData size : {}", tmp);
|
||||||
|
|
||||||
|
return bytes;
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
struct REArrIdIdx {
|
struct REArrIdIdx {
|
||||||
|
|
|
||||||
|
|
@ -78,7 +78,7 @@ struct ReadEnds : PhysicalLocation {
|
||||||
|
|
||||||
/* zzh增加的成员变量 */
|
/* zzh增加的成员变量 */
|
||||||
int64_t posKey = -1; // 根据位置信息生成的关键字 return (int64_t)tid <<
|
int64_t posKey = -1; // 根据位置信息生成的关键字 return (int64_t)tid <<
|
||||||
// MAX_CONTIG_LEN_SHIFT | (int64_t)pos;
|
// MAX_CONTIG_LEN_SHIFT | (int64_t)pos; (包含clip的序列,也就是可能比map结果更偏左)
|
||||||
|
|
||||||
/* 用来做一些判断,因为一些readends会做多次操作,比如task之间有重叠等等 */
|
/* 用来做一些判断,因为一些readends会做多次操作,比如task之间有重叠等等 */
|
||||||
int oprateTime = 0;
|
int oprateTime = 0;
|
||||||
|
|
@ -148,12 +148,12 @@ struct ReadEnds : PhysicalLocation {
|
||||||
comp = read2ReferenceIndex - o.read2ReferenceIndex;
|
comp = read2ReferenceIndex - o.read2ReferenceIndex;
|
||||||
if (comp == 0)
|
if (comp == 0)
|
||||||
comp = read2Coordinate - o.read2Coordinate;
|
comp = read2Coordinate - o.read2Coordinate;
|
||||||
if (comp == 0)
|
//if (comp == 0)
|
||||||
comp = tile - o.tile;
|
// comp = tile - o.tile;
|
||||||
if (comp == 0)
|
//if (comp == 0)
|
||||||
comp = x - o.x;
|
// comp = x - o.x;
|
||||||
if (comp == 0)
|
//if (comp == 0)
|
||||||
comp - y - o.y;
|
// comp - y - o.y;
|
||||||
if (comp == 0)
|
if (comp == 0)
|
||||||
comp = (int)(read1IndexInFile - o.read1IndexInFile);
|
comp = (int)(read1IndexInFile - o.read1IndexInFile);
|
||||||
if (comp == 0)
|
if (comp == 0)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue