pipeline模式,比parallel快,对na12878数据有bug

This commit is contained in:
zzh 2024-11-18 17:45:23 +08:00
parent 9688710573
commit 504e11331c
9 changed files with 2163 additions and 72 deletions

2
.vscode/launch.json vendored
View File

@ -16,7 +16,7 @@
"--INPUT", "~/data/bam/100w.sam", "--INPUT", "~/data/bam/100w.sam",
"--OUTPUT", "./out.sam", "--OUTPUT", "./out.sam",
"--METRICS_FILE", "metrics.txt", "--METRICS_FILE", "metrics.txt",
"--num_threads", "1", "--num_threads", "2",
"--max_mem", "2M", "--max_mem", "2M",
"--verbosity", "DEBUG", "--verbosity", "DEBUG",
"--asyncio", "true", "--asyncio", "true",

View File

@ -102,6 +102,7 @@
"stack": "cpp", "stack": "cpp",
"__bit_reference": "cpp", "__bit_reference": "cpp",
"__string": "cpp", "__string": "cpp",
"filesystem": "cpp" "filesystem": "cpp",
"ios": "cpp"
} }
} }

10
run.sh
View File

@ -6,26 +6,28 @@
nthread=32 nthread=32
#nthread=64 #nthread=64
#nthread=128 #nthread=128
#input=~/data/bam/zy_normal.bam #input=/home/zzh/data/bam/zy_normal.bam
input=~/data/bam/zy_tumor.bam #input=/home/zzh/data/bam/zy_tumor.bam
input=/home/zzh/data/wgs_na12878.bam
#input=~/data/bam/100w.bam #input=~/data/bam/100w.bam
#input=~/data/bam/t100w.sam #input=~/data/bam/t100w.sam
#input=~/data/bam/1k.sam #input=~/data/bam/1k.sam
#input=~/data/bam/1kw.sam #input=~/data/bam/1kw.sam
#input=~/data/bam/1kw.bam #input=~/data/bam/1kw.bam
#input=~/data/bam/n1kw.sam #input=~/data/bam/n1kw.sam
output=/home/zzh/data1/na12878_out.bam
cd ./build/ && make -j 8 && cd .. cd ./build/ && make -j 8 && cd ..
#./out.sam \ #./out.sam \
time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \ time /home/zzh/work/ngs/picard_cpp/build/bin/picard_cpp \
MarkDuplicates \ MarkDuplicates \
--INPUT $input \ --INPUT $input \
--OUTPUT ./out.bam \ --OUTPUT $output \
--INDEX_FORMAT BAI \ --INDEX_FORMAT BAI \
--num_threads $nthread \ --num_threads $nthread \
--max_mem 2G \ --max_mem 2G \
--verbosity DEBUG \ --verbosity DEBUG \
--asyncio true -TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX null # --asyncio true -TAG_DUPLICATE_SET_MEMBERS true --READ_NAME_REGEX null #
#--TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX "" #--TAG_DUPLICATE_SET_MEMBERS true #--READ_NAME_REGEX ""
#--READ_NAME_REGEX ".*?([0-9]+):([0-9]+):([0-9]+)$" #--READ_NAME_REGEX ".*?([0-9]+):([0-9]+):([0-9]+)$"
#--READ_NAME_REGEX "" #--READ_NAME_REGEX ""

View File

@ -104,6 +104,7 @@
may exit to abort the application, or if it returns, the yarn error may exit to abort the application, or if it returns, the yarn error
handler will exit (set to NULL by default for no action) handler will exit (set to NULL by default for no action)
*/ */
#pragma once
extern char *yarn_prefix; extern char *yarn_prefix;
extern void (*yarn_abort)(int); extern void (*yarn_abort)(int);
@ -135,5 +136,6 @@ enum wait_op {
void wait_for_(lock_t *, enum wait_op, long, char const *, long); void wait_for_(lock_t *, enum wait_op, long, char const *, long);
#define WAIT_FOR(a, b, c) wait_for_(a, b, c, __FILE__, __LINE__) #define WAIT_FOR(a, b, c) wait_for_(a, b, c, __FILE__, __LINE__)
long peek_lock(lock_t *); long peek_lock(lock_t *);
#define PEEK_LOCK(a) peek_lock(a)
void free_lock_(lock_t *, char const *, long); void free_lock_(lock_t *, char const *, long);
#define FREE_LOCK(a) free_lock_(a, __FILE__, __LINE__) #define FREE_LOCK(a) free_lock_(a, __FILE__, __LINE__)

View File

@ -22,15 +22,16 @@ Date : 2023/10/23
#include <iostream> #include <iostream>
#include <queue> #include <queue>
#include <set> #include <set>
#include <string>
#include <unordered_map> #include <unordered_map>
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
#include <string>
#include "dup_metrics.h" #include "dup_metrics.h"
#include "markdups_arg.h" #include "markdups_arg.h"
#include "md_funcs.h" #include "md_funcs.h"
#include "parallel_md.h" #include "parallel_md.h"
#include "pipeline_md.h"
#include "serial_md.h" #include "serial_md.h"
#include "shared_args.h" #include "shared_args.h"
@ -176,12 +177,14 @@ int MarkDuplicates(int argc, char *argv[]) {
hts_set_opt(g_outBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite); hts_set_opt(g_outBamFp, HTS_OPT_THREAD_POOL, &htsPoolWrite);
/* 冗余检查和标记 */ /* 冗余检查和标记 */
if (g_gArg.num_threads == 1) { // if (g_gArg.num_threads == 1) {
// serialMarkDups(); // 串行运行 // // serialMarkDups(); // 串行运行
parallelMarkDups(); // 并行运行 // parallelMarkDups(); // 并行运行
} else { // } else {
parallelMarkDups(); // 并行运行 // parallelMarkDups(); // 并行运行
} // }
pipelineMarkDups();
//parallelMarkDups();
/* 标记冗余, 将处理后的结果写入文件 */ /* 标记冗余, 将处理后的结果写入文件 */
sam_close(g_inBamFp); // 重新打开bam文件 sam_close(g_inBamFp); // 重新打开bam文件

View File

@ -136,13 +136,16 @@ struct ParallelData {
vector<ReadEnds> frags; // 暂未找到配对的reads vector<ReadEnds> frags; // 暂未找到配对的reads
UnpairedNameMap unpairedDic; // 用来寻找pair end UnpairedNameMap unpairedDic; // 用来寻找pair end
}; };
struct ParallelDataArg { struct ParallelDataArg {
vector<ParallelData> threadData; vector<ParallelData> threadData;
vector<vector<ReadEnds>*> pairs2d; // 成对的reads vector<vector<ReadEnds>*> pairs2d; // 成对的reads
vector<vector<ReadEnds>*> frags2d; // 暂未找到配对的reads vector<vector<ReadEnds>*> frags2d; // 暂未找到配对的reads
MarkDupDataArg *mdArg; MarkDupDataArg *mdArg;
int numThread;
void Init(int nThread) { void Init(int nThread) {
for (int i = 0; i <= nThread; ++i) { // pairs需要多一个数组 numThread = nThread;
for (int i = 0; i <= nThread; ++i) { // pairs需要多一个数组
threadData.push_back(ParallelData()); threadData.push_back(ParallelData());
} }
} }

View File

@ -6,6 +6,7 @@
#include <common/utils/util.h> #include <common/utils/util.h>
#include <sam/utils/read_ends.h> #include <sam/utils/read_ends.h>
#include <klib/kthread.h> #include <klib/kthread.h>
#include <pthread.h>
#include <algorithm> #include <algorithm>
#include <iostream> #include <iostream>
@ -264,18 +265,61 @@ static void threadGenerateReadEnds(void *data, long idx, int tid) {
// cout << "tid: " << tid << " end\t" << p.pairs.size() << "\t" << p.frags.size() << endl; // cout << "tid: " << tid << " end\t" << p.pairs.size() << "\t" << p.frags.size() << endl;
} }
static void *threadSortParallelPairs(void *data) {
auto &pd = *(ParallelDataArg *)data;
vector<ReadEnds> &pairs = pd.threadData[pd.numThread].pairs;
sort(pairs.begin(), pairs.end());
// 合并所有pairs
pd.pairs2d.clear();
for (int i = 0; i < pd.numThread; ++i) {
pd.pairs2d.push_back(&pd.threadData[i].pairs);
}
pd.pairs2d.push_back(&pd.threadData[pd.numThread].pairs);
ReadEndsQueue pairsQue;
pairsQue.Init(&pd.pairs2d);
const ReadEnds *pRE;
while ((pRE = pairsQue.Pop()) != nullptr) {
pd.mdArg->pairs.push_back(*pRE);
}
return nullptr;
}
static void *threadSortParallelFrags(void *data) {
auto &pd = *(ParallelDataArg *)data;
ReadEndsQueue fragsQue;
const ReadEnds *pRE;
pd.frags2d.clear();
for (int i = 0; i < pd.numThread; ++i) {
pd.frags2d.push_back(&pd.threadData[i].frags);
}
fragsQue.Init(&pd.frags2d);
while ((pRE = fragsQue.Pop()) != nullptr) {
pd.mdArg->frags.push_back(*pRE);
}
return nullptr;
}
static void *threadCreateUnpairInfo(void *data) {
auto &pd = *(ParallelDataArg *)data;
auto &p = *pd.mdArg;
for (auto &e : p.unpairedDic) {
auto posKey = e.second.unpairedRE.posKey;
auto &unpairArrInfo = p.unpairedPosArr[posKey];
unpairArrInfo.unpairedNum++;
unpairArrInfo.taskSeq = p.taskSeq;
unpairArrInfo.readNameSet.insert(e.first);
}
return nullptr;
}
/* 单线程生成readends (第一步)*/ /* 单线程生成readends (第一步)*/
static void generateReadEnds(ParallelDataArg &pdArg, MarkDupDataArg *arg) { static void generateReadEnds(ParallelDataArg &pdArg, MarkDupDataArg *arg) {
auto &p = *arg; auto &p = *arg;
auto &rnParser = g_vRnParser[0];
p.pairs.clear(); p.pairs.clear();
p.frags.clear(); p.frags.clear();
p.unpairedDic.clear(); p.unpairedDic.clear();
p.unpairedPosArr.clear(); p.unpairedPosArr.clear();
// cout << "read num: " << p.bams.size() << endl;
pdArg.mdArg = arg; pdArg.mdArg = arg;
tm_arr[8].acc_start(); tm_arr[8].acc_start();
kt_for(g_gArg.num_threads, threadGenerateReadEnds, &pdArg, g_gArg.num_threads); kt_for(g_gArg.num_threads, threadGenerateReadEnds, &pdArg, g_gArg.num_threads);
@ -283,7 +327,7 @@ static void generateReadEnds(ParallelDataArg &pdArg, MarkDupDataArg *arg) {
// 合并各个线程数据 // 合并各个线程数据
// 查找未匹配的frags // 查找未匹配的frags
tm_arr[9].acc_start();
vector<ReadEnds> &pairs = pdArg.threadData[g_gArg.num_threads].pairs; vector<ReadEnds> &pairs = pdArg.threadData[g_gArg.num_threads].pairs;
pairs.clear(); pairs.clear();
for (int i = 0; i < g_gArg.num_threads; ++i) { for (int i = 0; i < g_gArg.num_threads; ++i) {
@ -300,61 +344,21 @@ static void generateReadEnds(ParallelDataArg &pdArg, MarkDupDataArg *arg) {
} }
} }
} }
sort(pairs.begin(), pairs.end());
// 合并所有frags和pairs
pdArg.frags2d.clear();
pdArg.pairs2d.clear();
for (int i = 0; i < g_gArg.num_threads; ++i) {
pdArg.frags2d.push_back(&pdArg.threadData[i].frags);
pdArg.pairs2d.push_back(&pdArg.threadData[i].pairs);
}
if (false) {
cout << "taskseq: " << p.taskSeq << endl;
for (int i = 0; i < g_gArg.num_threads; ++i) {
cout << "thread pair num: " << pdArg.threadData[i].pairs.size() << endl;
cout << "thread frag num: " << pdArg.threadData[i].frags.size() << endl;
cout << "thread dic num: " << pdArg.threadData[i].unpairedDic.size() << endl;
}
}
pdArg.pairs2d.push_back(&pdArg.threadData[g_gArg.num_threads].pairs);
ReadEndsQueue fragsQue, pairsQue;
fragsQue.Init(&pdArg.frags2d);
pairsQue.Init(&pdArg.pairs2d);
tm_arr[9].acc_start();
const ReadEnds *pRE;
while((pRE = fragsQue.Pop()) != nullptr) {
p.frags.push_back(*pRE);
//cout << pRE->posKey << endl;
}
tm_arr[9].acc_end(); tm_arr[9].acc_end();
tm_arr[10].acc_start();
while ((pRE = pairsQue.Pop()) != nullptr) {
p.pairs.push_back(*pRE);
//cout << pRE->posKey << endl;
}
// 记录位点上的未匹配的read个数
for (auto &e : p.unpairedDic) { tm_arr[11].acc_start();
auto posKey = e.second.unpairedRE.posKey; pthread_t tid;
auto &unpairArrInfo = p.unpairedPosArr[posKey]; pthread_create(&tid, nullptr, threadSortParallelFrags, &pdArg);
unpairArrInfo.unpairedNum++; // threadSortParallelFrags(&pdArg);
unpairArrInfo.taskSeq = p.taskSeq; // tm_arr[12].acc_start();
unpairArrInfo.readNameSet.insert(e.first); threadSortParallelPairs(&pdArg);
} // tm_arr[12].acc_end();
tm_arr[10].acc_end(); // tm_arr[13].acc_start();
// cout << "frags num: " << p.frags.size() << endl; threadCreateUnpairInfo(&pdArg);
// cout << "pairs num: " << p.pairs.size() << endl; // tm_arr[13].acc_end();
// cout << "dic num : " << p.unpairedDic.size() << endl; pthread_join(tid, nullptr);
// cout << "generate end" << endl; tm_arr[11].acc_end();
} }
/* 处理pairs */ /* 处理pairs */
@ -1210,6 +1214,9 @@ void parallelMarkDups() {
cout << "build ends : " << tm_arr[8].acc_seconds_elapsed() << endl; cout << "build ends : " << tm_arr[8].acc_seconds_elapsed() << endl;
cout << "sort frags : " << tm_arr[9].acc_seconds_elapsed() << endl; cout << "sort frags : " << tm_arr[9].acc_seconds_elapsed() << endl;
cout << "sort pairs : " << tm_arr[10].acc_seconds_elapsed() << endl; cout << "sort pairs : " << tm_arr[10].acc_seconds_elapsed() << endl;
cout << "t frags : " << tm_arr[11].acc_seconds_elapsed() << endl;
cout << "t pairs : " << tm_arr[12].acc_seconds_elapsed() << endl;
cout << "t unpair : " << tm_arr[13].acc_seconds_elapsed() << endl;
cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl; cout << "all : " << tm_arr[5].acc_seconds_elapsed() << endl;
cout << "optical size: " << opticalDupNum << "\t" << opticalDupNum / 2 << endl; cout << "optical size: " << opticalDupNum << "\t" << opticalDupNum / 2 << endl;

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,170 @@
#pragma once
#include <common/utils/yarn.h>
#include <inttypes.h>
#include "md_types.h"
struct ReadData {
vector<BamWrap *> bams; // read step output
int64_t bamStartIdx = 0; // 每轮读入的bam数组起始位置在全局bam中的索引
int64_t taskSeq = 0; // 任务序号
};
class SortData;
struct GenREData {
vector<vector<ReadEnds>> pairsArr; // 成对的reads
vector<vector<ReadEnds>> fragsArr; // 暂未找到配对的reads
vector<UnpairedNameMap> unpairedDicArr; // 用来寻找pair end
void Init(int nThread) {
for (int i = 0; i <= nThread; ++i) { // 比线程多一个主要是pairs多一个其他没用
pairsArr.push_back(vector<ReadEnds>());
fragsArr.push_back(vector<ReadEnds>());
unpairedDicArr.push_back(UnpairedNameMap());
}
}
};
struct SortMarkData {
vector<ReadEnds> pairs; // 成对的reads
vector<ReadEnds> frags; // 暂未找到配对的reads
UnpairedNameMap unpairedDic; // 用来寻找pair end
UnpairedPositionMap unpairedPosArr; // 存放未匹配的ReadEnd对应位点的所有ReadEnd为了避免重复存储
};
struct SortData {
volatile void *pSmd;
};
struct MarkDupData {
int64_t taskSeq = 0; // 任务序号
DPSet<DupInfo> pairDupIdx; // pair的冗余read的索引
MDSet<int64_t> pairOpticalDupIdx; // optical冗余read的索引
DPSet<DupInfo> fragDupIdx; // frag的冗余read的索引
DPSet<DupInfo> pairRepIdx; // pair的dupset代表read的索引
MDSet<int64_t> pairSingletonIdx; // 某位置只有一对read的所有read pair个数
volatile void *pSmd;
};
struct IntersectData {
UnpairedNameMap unpairedDic; // 用来寻找pair end
UnpairedPositionMap unpairedPosArr;
// 每个task对应一个vector
vector<vector<DupInfo>> dupIdxArr;
vector<vector<int64_t>> opticalDupIdxArr;
vector<vector<DupInfo>> repIdxArr;
vector<vector<int64_t>> singletonIdxArr;
// 用来存放后续计算的数据
vector<DPSet<DupInfo>> latterDupIdxArr;
vector<MDSet<int64_t>> latterOpticalDupIdxArr;
vector<DPSet<DupInfo>> latterRepIdxArr;
vector<MDSet<int64_t>> latterSingletonIdxArr;
vector<MDSet<int64_t>> latterNotDupIdxArr;
vector<MDSet<int64_t>> latterNotOpticalDupIdxArr;
vector<MDSet<int64_t>> latterNotRepIdxArr;
vector<MDSet<int64_t>> latterNotSingletonIdxArr;
};
// 记录流水线状态task的序号以及某阶段是否结束
struct PipelineArg {
static const int GENBUFNUM = 2;
static const int SORTBUFNUM = 2;
static const int MARKBUFNUM = 3;
uint64_t readOrder = 0;
uint64_t genREOrder = 0;
uint64_t sortOrder = 0;
uint64_t markDupOrder = 0;
uint64_t intersectOrder = 0;
volatile int readFinish = 0;
volatile int genREFinish = 0;
volatile int sortFinish = 0;
volatile int markDupFinish = 0;
lock_t *readSig;
lock_t *genRESig;
lock_t *sortSig;
lock_t *markDupSig;
PipelineArg() {
readSig = NEW_LOCK(0); // 最大值1, 双buffer在bambuf中实现了对调用透明
genRESig = NEW_LOCK(0); // 最大值2, 双buffer
sortSig = NEW_LOCK(0);
markDupSig = NEW_LOCK(0);
for (int i = 0; i < SORTBUFNUM; ++i) sortData[i].pSmd = &smData[i];
for (int i = 0; i < MARKBUFNUM; ++i) markDupData[i].pSmd = &smData[i + SORTBUFNUM];
}
// sort mark shared data
SortMarkData smData[SORTBUFNUM + MARKBUFNUM];
// for step-1 read
ReadData readData;
// for step-2 generate readends
GenREData genREData[GENBUFNUM];
// for step-3 sort each thread frags and pairs
SortData sortData[SORTBUFNUM];
// for step-4 mark duplicate
MarkDupData markDupData[MARKBUFNUM];
// for step-5 deal with intersect data
IntersectData intersectData;
};
struct REArrIdIdx {
int arrId = 0; // 数组索引
uint64_t arrIdx = 0; // 数组内部当前索引
const ReadEnds *pE = nullptr;
};
struct REGreaterThan {
bool operator()(const REArrIdIdx &a, const REArrIdIdx &b) { return *b.pE < *a.pE; }
};
struct ReadEndsHeap {
// 将冗余索引和他对应的task vector对应起来
vector<vector<ReadEnds>> *arr2d;
priority_queue<REArrIdIdx, vector<REArrIdIdx>, REGreaterThan> minHeap;
uint64_t popNum = 0;
int Init(vector<vector<ReadEnds>> *_arr2d) {
arr2d = _arr2d;
if (arr2d == nullptr) {
return -1;
}
for (int i = 0; i < arr2d->size(); ++i) {
auto &v = (*arr2d)[i];
if (!v.empty()) {
minHeap.push({i, 1, &v[0]});
}
}
return 0;
}
const ReadEnds *Pop() {
const ReadEnds *ret = nullptr;
if (!minHeap.empty()) {
auto minVal = minHeap.top();
minHeap.pop();
++popNum;
ret = minVal.pE;
auto &v = (*arr2d)[minVal.arrId];
if (v.size() > minVal.arrIdx) {
minHeap.push({minVal.arrId, minVal.arrIdx + 1, &v[minVal.arrIdx]});
}
}
return ret;
}
uint64_t Size() {
uint64_t len = 0;
if (arr2d != nullptr) {
for (auto &v : *arr2d) {
len += v.size();
}
}
return len - popNum;
}
};
// 并行运行mark duplicate
void pipelineMarkDups();