normal没问题了,tumor结果还不一样

This commit is contained in:
zzh 2023-12-04 18:02:07 +08:00
parent 36b1430da5
commit 2d114058a1
6 changed files with 717 additions and 850 deletions

2
run.sh
View File

@ -1,6 +1,6 @@
time /home/zzh/work/GeneKit/picard_cpp/build/bin/picard_cpp \
MarkDuplicates \
--INPUT /mnt/d/data/zy_normal.bam \
--INPUT /mnt/d/data/zy_tumor.bam \
--OUTPUT /mnt/d/data/out.bam \
--num_threads 1 \
--max_mem 4G \

View File

@ -40,6 +40,17 @@ using std::cout;
#define NO_SUCH_INDEX INT64_MAX
static Timer tm_arr[10]; // 用来测试性能
/* 全局本地变量 */
static vector<ReadNameParser> g_vRnParser; // 每个线程一个read name parser
static samFile *g_inBamFp; // 输入的bam文件
static sam_hdr_t *g_inBamHeader; // 输入的bam文件头信息
static samFile *g_outBamFp = nullptr; // 输出文件, sam或者bam格式
static sam_hdr_t *g_outBamHeader; // 输出文件的header
/* 参数对象作为全局对象,免得多次作为参数传入函数中 */
static GlobalArg &g_gArg = GlobalArg::Instance();
static MarkDupsArg g_mdArg;
#include "md_funcs.h"
#include "serial_md.h"

View File

@ -1,29 +1,71 @@
#include <robin-map/include/tsl/robin_map.h>
/* 前向声明 */
class ThMarkDupArg;
/* 全局本地变量 */
static queue<ThMarkDupArg *> g_qpThMarkDupArg; // 存放线程变量的队列
static lock_t *g_queueFirstLock = NEW_LOCK(-1); // 队列的第一个任务是否完成
static lock_t *g_readyToReadLock = NEW_LOCK(-1); // 通知主线程是否可以进行下一次读取
static vector<ReadNameParser> g_vRnParser; // 每个线程一个read name parser
static int g_numDuplicateIndices = 0; // 找到的冗余read总数
static samFile *g_inBamFp; // 输入的bam文件
static sam_hdr_t *g_inBamHeader; // 输入的bam文件头信息
static samFile *g_outBamFp = nullptr; // 输出文件, sam或者bam格式
static sam_hdr_t *g_outBamHeader; // 输出文件的header
static int g_maxJobNum = 0; // 每次读取新的数据后,新增的任务数量
static int g_jobNumForRead = 0; // 任务数量降到当前值时开始下一轮读取
static volatile int64_t g_bamLoadedNum = 0; // 已经读入的read总数
static volatile int64_t g_bamUsedNum = 0; // 已经处理完写入输出文件的read总数
static vector<int64_t> g_vDupIdx; // 线程内部计算得出的
static vector<int64_t> g_vOpticalDupIdx;
static set<int64_t> g_sDupIdxLatter;
static set<int64_t> g_sOpticalDupIdxLatter;
static bool g_serialProcess = false; // 是否串行运行
/* 存放readend或者冗余idx避免频繁的开辟和释放内存 */
template <class T>
struct DupContainer
{
vector<vector<T>> arr; // 类似map<int64_t, set<ReadEnds>> 或 map<int64_t, set<int64_t>>
vector<int64_t> pos; // arr中每个元素对应的position
// unordered_map<int64_t, int64_t> idx; // 某个位点对应在vector中的坐标
tsl::robin_map<int64_t, int64_t> idx; // 某个位点对应在vector中的坐标
int64_t size = 0; // 实际使用的空间
int64_t capacity = 0; // 内存容量
inline void Init()
{
idx.clear();
size = 0;
}
inline void SortAtPos(int64_t p) // 这里的pos表示位点
{
if (idx.find(p) != idx.end())
{
const int64_t i = idx.at(p);
std::sort(arr[i].begin(), arr[i].end());
}
}
inline void SortAtIdx(int64_t i) // 这里的i表示vector的索引
{
std::sort(arr[i].begin(), arr[i].end());
}
inline void RemoveAtPos(int64_t p)
{
if (idx.find(p) != idx.end())
{
const int64_t i = idx.at(p);
arr[i].clear();
}
}
inline void RemoveAtIdx(int64_t i) // 这里的i表示vector的索引
{
arr[i].clear();
}
inline void AddAtPos(int64_t p, T &val)
{
AtPos(p).push_back(val);
}
inline vector<T> &AtPos(int64_t p)
{
if (idx.find(p) != idx.end())
{
const int64_t i = idx.at(p);
return arr[i];
}
/* 参数对象作为全局对象,免得多次作为参数传入函数中 */
static GlobalArg &g_gArg = GlobalArg::Instance();
static MarkDupsArg g_mdArg;
if (size >= capacity)
{
capacity += 1;
arr.push_back(vector<T>());
pos.push_back(0);
}
const int64_t i = size++;
idx[p] = i;
pos[i] = p;
arr[i].clear();
return arr[i];
}
};
/* 清除key位置的数据 */
static inline void clearIdxAtPos(int64_t key, map<int64_t, set<int64_t>> *pmsIdx)
@ -119,8 +161,8 @@ static void addRepresentativeReadIndex(vector<const ReadEnds *> &vpRe)
/* 处理一组pairend的readends标记冗余 */
static void markDuplicatePairs(int64_t posKey,
vector<const ReadEnds *> &vpRe,
map<int64_t, set<int64_t>> *pmsDupIdx,
map<int64_t, set<int64_t>> *pmsOpticalDupIdx)
DupContainer<int64_t> *dupIdx,
DupContainer<int64_t> *opticalDupIdx)
{
if (vpRe.size() < 2)
{
@ -131,8 +173,9 @@ static void markDuplicatePairs(int64_t posKey,
return;
}
// cout << "pos:" << posKey + 1 << ";size:" << vpRe.size() << endl;
auto &sDupIdx = (*pmsDupIdx)[posKey];
auto &sOpticalDupIdx = (*pmsOpticalDupIdx)[posKey];
auto &vDupIdx = dupIdx->AtPos(posKey);
auto &vOpticalDupIdx = opticalDupIdx->AtPos(posKey);
int maxScore = 0;
const ReadEnds *pBest = nullptr;
/** All read ends should have orientation FF, FR, RF, or RR **/
@ -152,9 +195,9 @@ static void markDuplicatePairs(int64_t posKey,
{
if (pe != pBest) // 非best
{
sDupIdx.insert(pe->read1IndexInFile); // 添加read1
vDupIdx.push_back(pe->read1IndexInFile); // 添加read1
if (pe->read2IndexInFile != pe->read1IndexInFile)
sDupIdx.insert(pe->read2IndexInFile); // 添加read2
vDupIdx.push_back(pe->read2IndexInFile); // 添加read2
}
}
@ -168,9 +211,9 @@ static void markDuplicatePairs(int64_t posKey,
static void markDuplicateFragments(int64_t posKey,
vector<const ReadEnds *> &vpRe,
bool containsPairs,
map<int64_t, set<int64_t>> *pmsDupIdx)
DupContainer<int64_t> *dupIdx)
{
auto &sDupIdx = (*pmsDupIdx)[posKey];
auto &vDupIdx = dupIdx->AtPos(posKey);
if (containsPairs)
{
@ -178,7 +221,7 @@ static void markDuplicateFragments(int64_t posKey,
{
if (!pe->IsPaired())
{
sDupIdx.insert(pe->read1IndexInFile);
vDupIdx.push_back(pe->read1IndexInFile);
}
}
}
@ -199,7 +242,7 @@ static void markDuplicateFragments(int64_t posKey,
{
if (pe != pBest)
{
sDupIdx.insert(pe->read1IndexInFile);
vDupIdx.push_back(pe->read1IndexInFile);
}
}
}
@ -207,45 +250,47 @@ static void markDuplicateFragments(int64_t posKey,
/* 处理位于某个坐标的pairend reads */
static inline void handlePairs(int64_t posKey,
set<ReadEnds> &sReadEnds,
vector<ReadEnds> &readEnds,
vector<const ReadEnds *> &vpCache,
map<int64_t, set<int64_t>> *pmsDupIdx,
map<int64_t, set<int64_t>> *pmsOpticalDupIdx)
DupContainer<int64_t> *dupIdx,
DupContainer<int64_t> *opticalDupIdx)
{
if (sReadEnds.size() > 1) // 有潜在的冗余
if (readEnds.size() > 1) // 有潜在的冗余
{
vpCache.clear();
// std::sort(readEnds.begin(), readEnds.end());
const ReadEnds *pReadEnd = nullptr;
for (auto &re : sReadEnds)
for (auto &re : readEnds)
{
if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, true)) // 跟前一个一样
vpCache.push_back(&re); // 处理一个潜在的冗余组
else
{
markDuplicatePairs(posKey, vpCache, pmsDupIdx, pmsOpticalDupIdx); // 不一样
markDuplicatePairs(posKey, vpCache, dupIdx, opticalDupIdx); // 不一样
vpCache.clear();
vpCache.push_back(&re);
pReadEnd = &re;
}
}
markDuplicatePairs(posKey, vpCache, pmsDupIdx, pmsOpticalDupIdx);
markDuplicatePairs(posKey, vpCache, dupIdx, opticalDupIdx);
}
}
/* 处理位于某个坐标的 reads */
static inline void handleFrags(
int64_t posKey,
set<ReadEnds> &sReadEnds,
vector<ReadEnds> &readEnds,
vector<const ReadEnds *> &vpCache,
map<int64_t, set<int64_t>> *pmsDupIdx)
DupContainer<int64_t> *dupIdx)
{
if (sReadEnds.size() > 1) // 有潜在的冗余
if (readEnds.size() > 1) // 有潜在的冗余
{
vpCache.clear();
// std::sort(readEnds.begin(), readEnds.end());
const ReadEnds *pReadEnd = nullptr;
bool containsPairs = false;
bool containsFrags = false;
for (auto &re : sReadEnds)
for (auto &re : readEnds)
{
if (pReadEnd != nullptr && ReadEnds::AreComparableForDuplicates(*pReadEnd, re, false))
{
@ -257,7 +302,7 @@ static inline void handleFrags(
{
if (vpCache.size() > 1 && containsFrags)
{
markDuplicateFragments(posKey, vpCache, containsPairs, pmsDupIdx);
markDuplicateFragments(posKey, vpCache, containsPairs, dupIdx);
}
vpCache.clear();
vpCache.push_back(&re);
@ -268,13 +313,13 @@ static inline void handleFrags(
}
if (vpCache.size() > 1 && containsFrags)
{
markDuplicateFragments(posKey, vpCache, containsPairs, pmsDupIdx);
markDuplicateFragments(posKey, vpCache, containsPairs, dupIdx);
}
}
}
/* 对找到的pairend read end添加一些信息 */
static inline void modifyPairedEnds(ReadEnds &fragEnd, ReadEnds *pPairedEnds)
static inline void modifyPairedEnds(const ReadEnds &fragEnd, ReadEnds *pPairedEnds)
{
auto &pairedEnds = *pPairedEnds;
int64_t bamIdx = fragEnd.read1IndexInFile;

View File

@ -1,540 +1,4 @@
/* 多线程处理冗余参数结构体 */
struct ThMarkDupArg
void parallelMarkDups()
{
int64_t bamStartIdx; // 当前vBam数组中第一个bam记录在整体bam中所处的位置
long seq; // 当前任务在所有任务的排序
bool more; // 后面还有任务
volatile bool finish; // 当前任务有没有处理完
vector<BamWrap *> vBam; // 存放待处理的bam read
map<int64_t, vector<ReadEnds>> mvPair; // 以冗余位置为索引保存所有pairend reads
map<int64_t, vector<ReadEnds>> mvFrag; // 保存所有reads包括pairend
map<int64_t, set<int64_t>> msPairDupIdx; // pair的冗余read的索引
map<int64_t, set<int64_t>> msPairOpticalDupIdx; // optical冗余read的索引
map<int64_t, set<int64_t>> msFragDupIdx; // frag的冗余read的索引
map<int64_t, set<int64_t>> msFragOpticalDupIdx; // 这个好像没用
unordered_map<string, ReadEnds> umReadEnds; // 用来寻找pair end
};
/*
* 线
*/
void thread_markdups(void *arg, int tid)
{
auto &p = *(ThMarkDupArg *)arg;
/* 处理每个read创建ReadEnd并放入frag和pair中 */
for (int i = 0; i < p.vBam.size(); ++i) // 循环处理每个read
{
BamWrap *bw = p.vBam[i];
const int64_t bamIdx = p.bamStartIdx + i;
if (bw->GetReadUnmappedFlag())
{
if (bw->b->core.tid == -1)
// When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort).
break;
}
else if (!bw->IsSecondaryOrSupplementary()) // 是主要比对
{
ReadEnds fragEnd;
buildReadEnds(*bw, bamIdx, g_vRnParser[tid], &fragEnd);
// if (fragEnd.read1Coordinate == 69662)
// {
// cout << fragEnd.read1Coordinate << '\t' << bw->GetUnclippedEnd() << '\t'
// << bw->GetUnclippedStart() << '\t' << bw->query_name() << '\t'
// << bw->cigar_str() << '\t' << bw->b->core.pos << '\t' <<
// bw->b->core.mpos << endl;
// }
p.mvFrag[fragEnd.posKey].push_back(fragEnd); // 添加进frag集合
if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) // 是pairend而且互补的read也比对上了
{
// if (bw->b->core.pos == 69813 || bw->b->core.pos == 69884)
// {
// cout << fragEnd.read1Coordinate << '\t' << bw->query_name() << endl;
// }
string key = bw->query_name();
if (p.umReadEnds.find(key) == p.umReadEnds.end())
{
p.umReadEnds[key] = fragEnd;
}
else // 找到了pairend
{
auto &pairedEnds = p.umReadEnds.at(key);
modifyPairedEnds(fragEnd, &pairedEnds);
p.mvPair[pairedEnds.posKey].push_back(pairedEnds);
p.umReadEnds.erase(key); // 删除找到的pairend
// if (pairedEnds.read1Coordinate == 69662)
//{
// cout << pairedEnds.posKey << endl;
// cout << pairedEnds.read1Coordinate << '\t'
// << pairedEnds.read2Coordinate << '\t'
// << (int)pairedEnds.orientation << endl;
// //<< fragEnd.read1Coordinate << '\t'
// //<< fragEnd.posKey << '\t'
// //<< bw->b->core.tid << '\t' << bw->b->core.pos << '\t'
// //<< bw->GetUnclippedEnd() << '\t' << bw->GetUnclippedStart() << endl;
// }
}
}
}
}
/* generateDuplicateIndexes计算冗余read在所有read中的位置索引 */
unordered_set<int64_t> usUnpairedPos; // 该位置有还未找到pair的read
for (auto &ele : p.umReadEnds)
{
usUnpairedPos.insert(ele.second.posKey);
}
// 先处理 pair
int dupNum = 0;
vector<ReadEnds *> vRePotentialDup; // 有可能是冗余的reads
for (auto &e : p.mvPair) // 按比对的位置先后进行遍历
{
// 如果这个位置里所有的read没有缺少pair的那么就处理否则跳过
// if (usUnpairedPos.find(e.first) == usUnpairedPos.end())
// handlePairs(e.first, e.second, vRePotentialDup, &p.msPairDupIdx, &p.msPairOpticalDupIdx);
}
// 再处理frag
for (auto &e : p.mvFrag)
{
// handleFrags(e.first, e.second, vRePotentialDup, &p.msFragDupIdx, &p.msFragOpticalDupIdx);
}
// cout << tid << '\t' << "dup: " << dupNum << endl;
// cout << tid << " all: no: " << p.vBam.size() << '\t' << p.umReadEnds.size() << endl;
/* 本段数据处理完成,告诉输出线程 */
if (!g_serialProcess)
{
POSSESS(g_queueFirstLock);
p.finish = true;
// cout << tid << ": process: " << p.seq << endl;
auto front = g_qpThMarkDupArg.front();
if (g_qpThMarkDupArg.size() > 0) // 表明是并行处理
{
if (front->finish)
{
TWIST(g_queueFirstLock, TO, front->seq); // 通知写线程,当前队列头部完成的任务
}
else
{
RELEASE(g_queueFirstLock);
}
}
}
}
/*
* 线线
* 1. precurprecur
* 2. pairendpre
* 3. mvUnpaired
*/
void thread_merge(void *)
{
bool more = false; // 是否还有下一个任务
long seq = 0; // 任务序列号
long unPairedNum = 0; // 没有找到pair的总数量
long dupReadNum = 0; // 冗余read总数量
POSSESS(g_queueFirstLock);
WAIT_FOR(g_queueFirstLock, TO_BE, seq++); // 等待首个任务完成
auto lastP = g_qpThMarkDupArg.front(); // 取队首的数据
// unordered_map<string, ReadEnds> umUnpairedReadEnds; // 还未找到pair的read
map<int64_t, vector<ReadEnds>> mvUnPaired; // 这些坐标对应的reads里有还没找到pair的
unordered_set<int64_t> usUnpairedPos; // 上一轮中不需要计算的点unpaired
unordered_map<int64_t, vector<ReadEnds>> mvPair; // 上一轮中遗留的需要重新计算的pair包括重叠和没paired
unordered_map<int64_t, vector<ReadEnds>> mvFrag; // 上一轮中需要重新计算的frag(与这一轮位置有重叠的)
vector<ReadEnds *> vRePotentialDup; // 有可能是冗余的reads
map<int64_t, set<int64_t>> msPairDupIdx; // pair的冗余read的索引
map<int64_t, set<int64_t>> msPairOpticalDupIdx; // optical冗余read的索引
auto umUnpairedReadEnds = lastP->umReadEnds;
auto p = lastP;
g_qpThMarkDupArg.pop(); // 删除队首
TWIST(g_queueFirstLock, TO, seq); // 解锁
more = lastP->more; // 是否还有下一个任务
while (more) // 循环处理,将结果写入文件
{
POSSESS(g_queueFirstLock);
if (g_qpThMarkDupArg.empty()) // 有可能新任务没来得及添加进队列
{
RELEASE(g_queueFirstLock);
continue;
}
WAIT_FOR(g_queueFirstLock, TO_BE, seq); // 等待任务完成
p = g_qpThMarkDupArg.front();
if (!p->finish) // 有可能这个任务没有完成是下边那个TWIST导致进到这里因为这一段代码可能运行比较快
{
TWIST(g_queueFirstLock, TO, -1); // 此时队首任务没完成,-1可以让锁无法进入到这里避免无效获得锁
continue;
}
g_qpThMarkDupArg.pop();
TWIST(g_queueFirstLock, TO, seq + 1);
/* 处理结果数据 */
// cout << "finish: " << seq - 1 << '\t' << "lastIdx: " << p->bamStartIdx+p->vBam.size() << endl;
// 找paired中重叠的部分
mvPair.clear();
mvFrag.clear();
usUnpairedPos.clear();
int64_t lastPairPos = lastP->mvPair.rbegin()->first; // 上一轮read最后到达的坐标
for (auto &pair : p->mvPair)
{
const int64_t pos = pair.first;
if (pos > lastPairPos) // 超过了上一个任务最大的位点坐标,那么就不再继续检查了
break;
mvPair.insert(pair); // 保存此轮任务当前位点的数据
clearIdxAtPos(pos, &p->msPairDupIdx); // 清除该位点的冗余结果
clearIdxAtPos(pos, &p->msPairOpticalDupIdx);
if (lastP->mvPair.find(pos) != lastP->mvPair.end()) // 上一个任务同一个位点也有数据
{
mvPair[pos].insert(mvPair[pos].end(), lastP->mvPair[pos].begin(), lastP->mvPair[pos].end());
clearIdxAtPos(pos, &lastP->msPairDupIdx); // 清除该位点的冗余结果
clearIdxAtPos(pos, &lastP->msPairOpticalDupIdx);
}
}
// 找上一轮中没配对的pair
for (auto itr = p->umReadEnds.begin(); itr != p->umReadEnds.end();) // 在当前任务中找有没有与上一个任务中没匹配的read相匹配的pair
{
auto key = itr->second.posKey;
auto &fragEnd = itr->second;
if (lastP->umReadEnds.find(itr->first) != lastP->umReadEnds.end())
{
auto &pairedEnds = lastP->umReadEnds.at(itr->first);
modifyPairedEnds(fragEnd, &pairedEnds);
mvPair[key].push_back(pairedEnds);
lastP->umReadEnds.erase(itr->first); // 删除
if (umUnpairedReadEnds.find(itr->first) != umUnpairedReadEnds.end())
umUnpairedReadEnds.erase(itr->first);
itr = p->umReadEnds.erase(itr); // 删除本轮中对应的unpaired read
}
else
{
if (umUnpairedReadEnds.find(itr->first) != umUnpairedReadEnds.end()) // 前边一直没找到paired的数据在这一轮找到了
{
auto &pairedEnds = umUnpairedReadEnds.at(itr->first);
modifyPairedEnds(fragEnd, &pairedEnds);
mvUnPaired[key].push_back(pairedEnds);
umUnpairedReadEnds.erase(itr->first); // 删除
itr = p->umReadEnds.erase(itr); // 删除本轮中对应的unpaired read
}
else // 新的没配对的
{
umUnpairedReadEnds.insert(*itr); // 没有pair则添加
auto &v = p->mvPair[key];
mvUnPaired[key].insert(mvUnPaired[key].end(), v.begin(), v.end());
++itr;
}
}
}
// 计算上一轮还需要计算的pair
for (auto &e : lastP->umReadEnds)
usUnpairedPos.insert(e.second.posKey);
for (auto &e : mvPair)
{
//if (usUnpairedPos.find(e.first) == usUnpairedPos.end())
// handlePairs(e.first, e.second, vRePotentialDup, &lastP->msPairDupIdx, &lastP->msPairOpticalDupIdx);
}
// 计算多轮之后遗留的pair
usUnpairedPos.clear();
for (auto &e : umUnpairedReadEnds)
usUnpairedPos.insert(e.second.posKey);
for (auto itr = mvUnPaired.begin(); itr != mvUnPaired.end();)
{
if (usUnpairedPos.find(itr->first) == usUnpairedPos.end())
{
// handlePairs(itr->first, itr->second, vRePotentialDup, &msPairDupIdx, &msPairOpticalDupIdx);
// itr = mvUnPaired.erase(itr);
}
else
{
++itr;
}
}
// 找上一轮重叠的frag
int64_t lastFragPos = lastP->mvFrag.rbegin()->first;
for (auto &pair : p->mvFrag)
{
const int64_t pos = pair.first;
if (pos > lastPairPos) // 超过了上一个任务最大的位点坐标,那么就不再继续检查了
break;
mvFrag.insert(pair); // 保存此轮任务当前位点的数据
clearIdxAtPos(pos, &p->msFragDupIdx); // 清除该位点的冗余结果
if (lastP->mvFrag.find(pos) != lastP->mvFrag.end()) // 上一个任务同一个位点也有数据
{
mvFrag[pos].insert(mvFrag[pos].end(), lastP->mvFrag[pos].begin(), lastP->mvFrag[pos].end());
clearIdxAtPos(pos, &lastP->msFragDupIdx); // 上一个任务该位点有冗余结果
}
}
for (auto &e : mvFrag)
{
// handleFrags(e.first, e.second, vRePotentialDup, &lastP->msFragDupIdx, &lastP->msFragOpticalDupIdx);
}
// 计算冗余数量
for (auto &ele : lastP->msPairDupIdx)
dupReadNum += ele.second.size();
for (auto &ele : lastP->msFragDupIdx)
dupReadNum += ele.second.size();
/* 更新处理完的read数量和状态 */
POSSESS(g_readyToReadLock);
g_bamUsedNum += lastP->vBam.size();
// cout << "write: " << g_qpThMarkDupArg.size() << endl;
if (g_qpThMarkDupArg.size() <= g_jobNumForRead)
{
TWIST(g_readyToReadLock, TO, 1);
}
else
{
RELEASE(g_readyToReadLock);
}
/* 准备下一轮循环 */
delete lastP;
more = p->more;
lastP = p;
seq++;
// cout << "unpaired: " << lastP->umReadEnds.size() << endl;
}
unPairedNum = umUnpairedReadEnds.size();
// 计算冗余数量
for (auto &ele : lastP->msPairDupIdx)
dupReadNum += ele.second.size();
for (auto &ele : lastP->msFragDupIdx)
dupReadNum += ele.second.size();
// 多轮遗留的
for (auto &ele : msPairDupIdx)
dupReadNum += ele.second.size();
// cout << "Finally unpaired read num: " << unPairedNum << endl;
// cout << "Finally dulicate read num: " << dupReadNum << endl;
// cout << "last unpaired num: " << lastP->umReadEnds.size() << endl;
int unpairedReads = 0;
for (auto &e : mvUnPaired)
unpairedReads += e.second.size();
// cout << "mvUnpaired size: " << mvUnPaired.size() << endl;
// cout << "mvUnpaired vector size: " << unpairedReads << endl;
// 最后一个数据处理完了
POSSESS(g_readyToReadLock);
g_bamUsedNum += lastP->vBam.size();
TWIST(g_readyToReadLock, TO, 1);
// cout << "last finish: " << seq - 1 << endl;
delete lastP;
pthread_exit(0);
}
/* 串行运行 */
void serialProcessData(BamBufType &inBamBuf)
{
ThMarkDupArg p({0,
0,
true,
true,
inBamBuf.Slice(0, inBamBuf.Size())});
thread_markdups(&p, 0);
/* generateDuplicateIndexes计算冗余read在所有read中的位置索引 */
unordered_set<int64_t> usUnpairedPos; // 该位置有还未找到pair的read
for (auto &ele : p.umReadEnds)
{
usUnpairedPos.insert(ele.second.posKey);
}
// 先处理 pair
int dupNum = 0;
vector<ReadEnds *> vRePotentialDup; // 有可能是冗余的reads
for (auto &e : p.mvPair) // 按比对的位置先后进行遍历
{
// 如果这个位置里所有的read没有缺少pair的那么就处理否则跳过
// if (usUnpairedPos.find(e.first) != usUnpairedPos.end())
// handlePairs(e.first, e.second, vRePotentialDup, &p.msPairDupIdx, &p.msPairOpticalDupIdx);
}
// int64_t estimateDupNum = 0;
// for (auto &e : p.mvPair)
// {
// int FF = 0;
// int FR = 0;
// int RF = 0;
// int RR = 0;
// for (auto &re : e.second)
// {
// if (re.orientation == ReadEnds::FF)
// FF++;
// else if (re.orientation == ReadEnds::FR)
// FR++;
// else if (re.orientation == ReadEnds::RF)
// RF++;
// else
// RR++;
// }
// if (FF > 1) {
// estimateDupNum += (FF - 1);
// }
// if (FR > 1)
// {
// estimateDupNum += (FR - 1);
// }
// if (RF > 1)
// {
// estimateDupNum += (RF - 1);
// }
// if (RR > 1)
// {
// estimateDupNum += (RR - 1);
// }
// }
// cout << "Estimate dup num: " << estimateDupNum << endl;
// cout << "Finally unpaired read num: " << p.umReadEnds.size() << endl;
int dupReadNum = 0;
for (auto &ele : p.msPairDupIdx)
dupReadNum += ele.second.size();
cout << "pair dulicate read num: " << dupReadNum << endl;
for (auto &ele : p.msFragDupIdx)
dupReadNum += ele.second.size();
cout << "Finally dulicate read num: " << dupReadNum << endl;
// int pairNum = 0, fragNum = 0;
// for (auto &e : p.mvPair)
// pairNum += e.second.size();
// for (auto &e : p.mvFrag)
// fragNum += e.second.size();
// cout << "pair num: " << pairNum << endl;
// cout << "frag num: " << fragNum << endl;
}
static void parallelMarkDups()
{
// /* 读取缓存初始化 */
BamBufType inBamBuf(g_gArg.use_asyncio);
inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem);
/* 循环读入信息,并处理 */
g_maxJobNum = g_gArg.num_threads * 10;
// g_maxJobNum = g_gArg.num_threads * 3;
g_jobNumForRead = g_gArg.num_threads * 2;
int64_t x_all = 0; // for test
int64_t jobSeq = 0;
int64_t processedBamNum = 0; // 记录每个轮次累计处理的reads数量用来计算每个read在整个文件中的索引位置
threadpool thpool;
thread *mergeth;
if (g_gArg.num_threads == 1)
{
g_serialProcess = true;
}
else
{
thpool = thpool_init(g_gArg.num_threads); // 创建mark dup所需的线程池
mergeth = LAUNCH(thread_merge, nullptr); // 启动处理结果的的线程,合并所有线程的结果
}
const int minReadNum = 1000000; // 最小并行处理的read数量
int bamRemainSize = 0; // 上一轮还剩下的bam数量包含已经在任务里的和没有放进任务的
int numReadsForEachJob = 0; // 每个线程处理的read数量第一次读取的时候进行设置
int lastRoundUnProcessed = 0; // 上一轮没有放进任务里的read数量
int curRoundProcessed = 0; // 这一轮放进任务的read数量
while (inBamBuf.ReadStat() >= 0)
{
/* 读取bam文件中的read */
size_t readNum = inBamBuf.ReadBam();
cout << readNum << endl; // 这一轮读取的bam数量
// if (readNum <= minReadNum)
// {
// serialProcess = true;
// // 调用串行运行代码
// serialProcessData(inBamBuf);
// break;
// }
if (g_serialProcess)
{
tm_arr[0].acc_start();
serialProcessData(inBamBuf);
tm_arr[0].acc_end();
inBamBuf.ClearAll();
continue;
}
if (numReadsForEachJob == 0)
numReadsForEachJob = readNum / g_maxJobNum; // 第一次读取bam的时候进行设置
g_bamLoadedNum += readNum;
/* 多线程处理 任务数是线程数的10倍 */
curRoundProcessed = 0; // 当前轮次已经处理的reads数量
int numNeedToProcess = inBamBuf.Size() - bamRemainSize + lastRoundUnProcessed; // 当前需要处理的bam数量
for (int i = 0; numNeedToProcess >= numReadsForEachJob; ++i) // 只有待处理的reads数量大于一次任务的数量时新建任务
{
int startIdx = i * numReadsForEachJob + bamRemainSize - lastRoundUnProcessed;
int endIdx = (i + 1) * numReadsForEachJob + bamRemainSize - lastRoundUnProcessed;
ThMarkDupArg *thArg = new ThMarkDupArg({processedBamNum + curRoundProcessed,
jobSeq++,
true,
false,
inBamBuf.Slice(startIdx, endIdx)});
POSSESS(g_queueFirstLock); // 加锁
g_qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列
RELEASE(g_queueFirstLock); // 解锁
thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务
curRoundProcessed += endIdx - startIdx;
numNeedToProcess -= numReadsForEachJob;
}
processedBamNum += curRoundProcessed;
lastRoundUnProcessed = numNeedToProcess;
/* 等待可以继续读取的信号 */
POSSESS(g_readyToReadLock);
WAIT_FOR(g_readyToReadLock, TO_BE, 1);
bamRemainSize = g_bamLoadedNum - g_bamUsedNum;
while (bamRemainSize >= inBamBuf.Size() / 2)
{ // 要保留的多于现在有的bam数量的一半那就等待write线程继续处理
TWIST(g_readyToReadLock, TO, 0);
POSSESS(g_readyToReadLock);
WAIT_FOR(g_readyToReadLock, TO_BE, 1);
bamRemainSize = g_bamLoadedNum - g_bamUsedNum;
}
inBamBuf.ClearBeforeIdx(inBamBuf.Size() - bamRemainSize); // 清理掉已经处理完的reads
// cout << g_bamLoadedNum << '\t' << g_bamUsedNum << '\t' << bamRemainSize << '\t' << inBamBuf.Size() << endl;
TWIST(g_readyToReadLock, TO, 0);
}
if (!g_serialProcess)
{
/* 数据读完了放一个空的任务好让write thread停下来 */
ThMarkDupArg *thArg = nullptr;
if (lastRoundUnProcessed > 0) // 最后一轮还有没有添加进任务的read数据
{
thArg = new ThMarkDupArg({processedBamNum + curRoundProcessed, jobSeq++, false, false,
inBamBuf.Slice(inBamBuf.Size() - lastRoundUnProcessed, inBamBuf.Size())});
processedBamNum += lastRoundUnProcessed;
}
else
{
thArg = new ThMarkDupArg({0, jobSeq++, false, false});
}
POSSESS(g_queueFirstLock); // 加锁
g_qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列
RELEASE(g_queueFirstLock); // 解锁
thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务
/* 同步所有线程 */
thpool_wait(thpool);
thpool_destroy(thpool);
JOIN(mergeth);
}
inBamBuf.FreeMemory();
cout << "x_all: " << x_all << endl;
cout << "loaded: " << g_bamLoadedNum << endl;
cout << " used: " << g_bamUsedNum << endl;
cout << "processedBamNum: " << processedBamNum << endl;
}

File diff suppressed because it is too large Load Diff

View File

@ -101,6 +101,15 @@ struct ReadEnds : PhysicalLocation
return areComparable;
}
// 找某一个位置的所有readend时需要
static bool pairsCmp(const ReadEnds &lhs, const ReadEnds &rhs)
{
int comp = lhs.read1ReferenceIndex - rhs.read1ReferenceIndex;
if (comp == 0)
comp = lhs.read1Coordinate - rhs.read1Coordinate;
return comp < 0;
}
/* 比对方向是否正向 */
bool IsPositiveStrand() const
{