picard_cpp/src/sam/markdups/parallel_md.h

540 lines
23 KiB
C
Raw Normal View History

/* 多线程处理冗余参数结构体 */
struct ThMarkDupArg
{
int64_t bamStartIdx; // 当前vBam数组中第一个bam记录在整体bam中所处的位置
long seq; // 当前任务在所有任务的排序
bool more; // 后面还有任务
volatile bool finish; // 当前任务有没有处理完
vector<BamWrap *> vBam; // 存放待处理的bam read
map<int64_t, vector<ReadEnds>> mvPair; // 以冗余位置为索引保存所有pairend reads
map<int64_t, vector<ReadEnds>> mvFrag; // 保存所有reads包括pairend
map<int64_t, set<int64_t>> msPairDupIdx; // pair的冗余read的索引
map<int64_t, set<int64_t>> msPairOpticalDupIdx; // optical冗余read的索引
map<int64_t, set<int64_t>> msFragDupIdx; // frag的冗余read的索引
map<int64_t, set<int64_t>> msFragOpticalDupIdx; // 这个好像没用
unordered_map<string, ReadEnds> umReadEnds; // 用来寻找pair end
};
/*
* 线
*/
void thread_markdups(void *arg, int tid)
{
auto &p = *(ThMarkDupArg *)arg;
/* 处理每个read创建ReadEnd并放入frag和pair中 */
for (int i = 0; i < p.vBam.size(); ++i) // 循环处理每个read
{
BamWrap *bw = p.vBam[i];
const int64_t bamIdx = p.bamStartIdx + i;
if (bw->GetReadUnmappedFlag())
{
if (bw->b->core.tid == -1)
// When we hit the unmapped reads with no coordinate, no reason to continue (only in coordinate sort).
break;
}
else if (!bw->IsSecondaryOrSupplementary()) // 是主要比对
{
ReadEnds fragEnd;
buildReadEnds(*bw, bamIdx, g_vRnParser[tid], &fragEnd);
// if (fragEnd.read1Coordinate == 69662)
// {
// cout << fragEnd.read1Coordinate << '\t' << bw->GetUnclippedEnd() << '\t'
// << bw->GetUnclippedStart() << '\t' << bw->query_name() << '\t'
// << bw->cigar_str() << '\t' << bw->b->core.pos << '\t' <<
// bw->b->core.mpos << endl;
// }
p.mvFrag[fragEnd.posKey].push_back(fragEnd); // 添加进frag集合
if (bw->GetReadPairedFlag() && !bw->GetMateUnmappedFlag()) // 是pairend而且互补的read也比对上了
{
// if (bw->b->core.pos == 69813 || bw->b->core.pos == 69884)
// {
// cout << fragEnd.read1Coordinate << '\t' << bw->query_name() << endl;
// }
string key = bw->query_name();
if (p.umReadEnds.find(key) == p.umReadEnds.end())
{
p.umReadEnds[key] = fragEnd;
}
else // 找到了pairend
{
auto &pairedEnds = p.umReadEnds.at(key);
modifyPairedEnds(fragEnd, &pairedEnds);
p.mvPair[pairedEnds.posKey].push_back(pairedEnds);
p.umReadEnds.erase(key); // 删除找到的pairend
// if (pairedEnds.read1Coordinate == 69662)
//{
// cout << pairedEnds.posKey << endl;
// cout << pairedEnds.read1Coordinate << '\t'
// << pairedEnds.read2Coordinate << '\t'
// << (int)pairedEnds.orientation << endl;
// //<< fragEnd.read1Coordinate << '\t'
// //<< fragEnd.posKey << '\t'
// //<< bw->b->core.tid << '\t' << bw->b->core.pos << '\t'
// //<< bw->GetUnclippedEnd() << '\t' << bw->GetUnclippedStart() << endl;
// }
}
}
}
}
/* generateDuplicateIndexes计算冗余read在所有read中的位置索引 */
unordered_set<int64_t> usUnpairedPos; // 该位置有还未找到pair的read
for (auto &ele : p.umReadEnds)
{
usUnpairedPos.insert(ele.second.posKey);
}
// 先处理 pair
int dupNum = 0;
vector<ReadEnds *> vRePotentialDup; // 有可能是冗余的reads
for (auto &e : p.mvPair) // 按比对的位置先后进行遍历
{
// 如果这个位置里所有的read没有缺少pair的那么就处理否则跳过
// if (usUnpairedPos.find(e.first) == usUnpairedPos.end())
// handlePairs(e.first, e.second, vRePotentialDup, &p.msPairDupIdx, &p.msPairOpticalDupIdx);
}
// 再处理frag
for (auto &e : p.mvFrag)
{
// handleFrags(e.first, e.second, vRePotentialDup, &p.msFragDupIdx, &p.msFragOpticalDupIdx);
}
// cout << tid << '\t' << "dup: " << dupNum << endl;
// cout << tid << " all: no: " << p.vBam.size() << '\t' << p.umReadEnds.size() << endl;
/* 本段数据处理完成,告诉输出线程 */
if (!g_serialProcess)
{
POSSESS(g_queueFirstLock);
p.finish = true;
// cout << tid << ": process: " << p.seq << endl;
auto front = g_qpThMarkDupArg.front();
if (g_qpThMarkDupArg.size() > 0) // 表明是并行处理
{
if (front->finish)
{
TWIST(g_queueFirstLock, TO, front->seq); // 通知写线程,当前队列头部完成的任务
}
else
{
RELEASE(g_queueFirstLock);
}
}
}
}
/*
* 线线
* 1. precurprecur
* 2. pairendpre
* 3. mvUnpaired
*/
void thread_merge(void *)
{
bool more = false; // 是否还有下一个任务
long seq = 0; // 任务序列号
long unPairedNum = 0; // 没有找到pair的总数量
long dupReadNum = 0; // 冗余read总数量
POSSESS(g_queueFirstLock);
WAIT_FOR(g_queueFirstLock, TO_BE, seq++); // 等待首个任务完成
auto lastP = g_qpThMarkDupArg.front(); // 取队首的数据
// unordered_map<string, ReadEnds> umUnpairedReadEnds; // 还未找到pair的read
map<int64_t, vector<ReadEnds>> mvUnPaired; // 这些坐标对应的reads里有还没找到pair的
unordered_set<int64_t> usUnpairedPos; // 上一轮中不需要计算的点unpaired
unordered_map<int64_t, vector<ReadEnds>> mvPair; // 上一轮中遗留的需要重新计算的pair包括重叠和没paired
unordered_map<int64_t, vector<ReadEnds>> mvFrag; // 上一轮中需要重新计算的frag(与这一轮位置有重叠的)
vector<ReadEnds *> vRePotentialDup; // 有可能是冗余的reads
map<int64_t, set<int64_t>> msPairDupIdx; // pair的冗余read的索引
map<int64_t, set<int64_t>> msPairOpticalDupIdx; // optical冗余read的索引
auto umUnpairedReadEnds = lastP->umReadEnds;
auto p = lastP;
g_qpThMarkDupArg.pop(); // 删除队首
TWIST(g_queueFirstLock, TO, seq); // 解锁
more = lastP->more; // 是否还有下一个任务
while (more) // 循环处理,将结果写入文件
{
POSSESS(g_queueFirstLock);
if (g_qpThMarkDupArg.empty()) // 有可能新任务没来得及添加进队列
{
RELEASE(g_queueFirstLock);
continue;
}
WAIT_FOR(g_queueFirstLock, TO_BE, seq); // 等待任务完成
p = g_qpThMarkDupArg.front();
if (!p->finish) // 有可能这个任务没有完成是下边那个TWIST导致进到这里因为这一段代码可能运行比较快
{
TWIST(g_queueFirstLock, TO, -1); // 此时队首任务没完成,-1可以让锁无法进入到这里避免无效获得锁
continue;
}
g_qpThMarkDupArg.pop();
TWIST(g_queueFirstLock, TO, seq + 1);
/* 处理结果数据 */
// cout << "finish: " << seq - 1 << '\t' << "lastIdx: " << p->bamStartIdx+p->vBam.size() << endl;
// 找paired中重叠的部分
mvPair.clear();
mvFrag.clear();
usUnpairedPos.clear();
int64_t lastPairPos = lastP->mvPair.rbegin()->first; // 上一轮read最后到达的坐标
for (auto &pair : p->mvPair)
{
const int64_t pos = pair.first;
if (pos > lastPairPos) // 超过了上一个任务最大的位点坐标,那么就不再继续检查了
break;
mvPair.insert(pair); // 保存此轮任务当前位点的数据
clearIdxAtPos(pos, &p->msPairDupIdx); // 清除该位点的冗余结果
clearIdxAtPos(pos, &p->msPairOpticalDupIdx);
if (lastP->mvPair.find(pos) != lastP->mvPair.end()) // 上一个任务同一个位点也有数据
{
mvPair[pos].insert(mvPair[pos].end(), lastP->mvPair[pos].begin(), lastP->mvPair[pos].end());
clearIdxAtPos(pos, &lastP->msPairDupIdx); // 清除该位点的冗余结果
clearIdxAtPos(pos, &lastP->msPairOpticalDupIdx);
}
}
// 找上一轮中没配对的pair
for (auto itr = p->umReadEnds.begin(); itr != p->umReadEnds.end();) // 在当前任务中找有没有与上一个任务中没匹配的read相匹配的pair
{
auto key = itr->second.posKey;
auto &fragEnd = itr->second;
if (lastP->umReadEnds.find(itr->first) != lastP->umReadEnds.end())
{
auto &pairedEnds = lastP->umReadEnds.at(itr->first);
modifyPairedEnds(fragEnd, &pairedEnds);
mvPair[key].push_back(pairedEnds);
lastP->umReadEnds.erase(itr->first); // 删除
if (umUnpairedReadEnds.find(itr->first) != umUnpairedReadEnds.end())
umUnpairedReadEnds.erase(itr->first);
itr = p->umReadEnds.erase(itr); // 删除本轮中对应的unpaired read
}
else
{
if (umUnpairedReadEnds.find(itr->first) != umUnpairedReadEnds.end()) // 前边一直没找到paired的数据在这一轮找到了
{
auto &pairedEnds = umUnpairedReadEnds.at(itr->first);
modifyPairedEnds(fragEnd, &pairedEnds);
mvUnPaired[key].push_back(pairedEnds);
umUnpairedReadEnds.erase(itr->first); // 删除
itr = p->umReadEnds.erase(itr); // 删除本轮中对应的unpaired read
}
else // 新的没配对的
{
umUnpairedReadEnds.insert(*itr); // 没有pair则添加
auto &v = p->mvPair[key];
mvUnPaired[key].insert(mvUnPaired[key].end(), v.begin(), v.end());
++itr;
}
}
}
// 计算上一轮还需要计算的pair
for (auto &e : lastP->umReadEnds)
usUnpairedPos.insert(e.second.posKey);
for (auto &e : mvPair)
{
//if (usUnpairedPos.find(e.first) == usUnpairedPos.end())
// handlePairs(e.first, e.second, vRePotentialDup, &lastP->msPairDupIdx, &lastP->msPairOpticalDupIdx);
}
// 计算多轮之后遗留的pair
usUnpairedPos.clear();
for (auto &e : umUnpairedReadEnds)
usUnpairedPos.insert(e.second.posKey);
for (auto itr = mvUnPaired.begin(); itr != mvUnPaired.end();)
{
if (usUnpairedPos.find(itr->first) == usUnpairedPos.end())
{
// handlePairs(itr->first, itr->second, vRePotentialDup, &msPairDupIdx, &msPairOpticalDupIdx);
// itr = mvUnPaired.erase(itr);
}
else
{
++itr;
}
}
// 找上一轮重叠的frag
int64_t lastFragPos = lastP->mvFrag.rbegin()->first;
for (auto &pair : p->mvFrag)
{
const int64_t pos = pair.first;
if (pos > lastPairPos) // 超过了上一个任务最大的位点坐标,那么就不再继续检查了
break;
mvFrag.insert(pair); // 保存此轮任务当前位点的数据
clearIdxAtPos(pos, &p->msFragDupIdx); // 清除该位点的冗余结果
if (lastP->mvFrag.find(pos) != lastP->mvFrag.end()) // 上一个任务同一个位点也有数据
{
mvFrag[pos].insert(mvFrag[pos].end(), lastP->mvFrag[pos].begin(), lastP->mvFrag[pos].end());
clearIdxAtPos(pos, &lastP->msFragDupIdx); // 上一个任务该位点有冗余结果
}
}
for (auto &e : mvFrag)
{
// handleFrags(e.first, e.second, vRePotentialDup, &lastP->msFragDupIdx, &lastP->msFragOpticalDupIdx);
}
// 计算冗余数量
for (auto &ele : lastP->msPairDupIdx)
dupReadNum += ele.second.size();
for (auto &ele : lastP->msFragDupIdx)
dupReadNum += ele.second.size();
/* 更新处理完的read数量和状态 */
POSSESS(g_readyToReadLock);
g_bamUsedNum += lastP->vBam.size();
// cout << "write: " << g_qpThMarkDupArg.size() << endl;
if (g_qpThMarkDupArg.size() <= g_jobNumForRead)
{
TWIST(g_readyToReadLock, TO, 1);
}
else
{
RELEASE(g_readyToReadLock);
}
/* 准备下一轮循环 */
delete lastP;
more = p->more;
lastP = p;
seq++;
// cout << "unpaired: " << lastP->umReadEnds.size() << endl;
}
unPairedNum = umUnpairedReadEnds.size();
// 计算冗余数量
for (auto &ele : lastP->msPairDupIdx)
dupReadNum += ele.second.size();
for (auto &ele : lastP->msFragDupIdx)
dupReadNum += ele.second.size();
// 多轮遗留的
for (auto &ele : msPairDupIdx)
dupReadNum += ele.second.size();
// cout << "Finally unpaired read num: " << unPairedNum << endl;
// cout << "Finally dulicate read num: " << dupReadNum << endl;
// cout << "last unpaired num: " << lastP->umReadEnds.size() << endl;
int unpairedReads = 0;
for (auto &e : mvUnPaired)
unpairedReads += e.second.size();
// cout << "mvUnpaired size: " << mvUnPaired.size() << endl;
// cout << "mvUnpaired vector size: " << unpairedReads << endl;
// 最后一个数据处理完了
POSSESS(g_readyToReadLock);
g_bamUsedNum += lastP->vBam.size();
TWIST(g_readyToReadLock, TO, 1);
// cout << "last finish: " << seq - 1 << endl;
delete lastP;
pthread_exit(0);
}
/* 串行运行 */
void serialProcessData(BamBufType &inBamBuf)
{
ThMarkDupArg p({0,
0,
true,
true,
inBamBuf.Slice(0, inBamBuf.Size())});
thread_markdups(&p, 0);
/* generateDuplicateIndexes计算冗余read在所有read中的位置索引 */
unordered_set<int64_t> usUnpairedPos; // 该位置有还未找到pair的read
for (auto &ele : p.umReadEnds)
{
usUnpairedPos.insert(ele.second.posKey);
}
// 先处理 pair
int dupNum = 0;
vector<ReadEnds *> vRePotentialDup; // 有可能是冗余的reads
for (auto &e : p.mvPair) // 按比对的位置先后进行遍历
{
// 如果这个位置里所有的read没有缺少pair的那么就处理否则跳过
// if (usUnpairedPos.find(e.first) != usUnpairedPos.end())
// handlePairs(e.first, e.second, vRePotentialDup, &p.msPairDupIdx, &p.msPairOpticalDupIdx);
}
// int64_t estimateDupNum = 0;
// for (auto &e : p.mvPair)
// {
// int FF = 0;
// int FR = 0;
// int RF = 0;
// int RR = 0;
// for (auto &re : e.second)
// {
// if (re.orientation == ReadEnds::FF)
// FF++;
// else if (re.orientation == ReadEnds::FR)
// FR++;
// else if (re.orientation == ReadEnds::RF)
// RF++;
// else
// RR++;
// }
// if (FF > 1) {
// estimateDupNum += (FF - 1);
// }
// if (FR > 1)
// {
// estimateDupNum += (FR - 1);
// }
// if (RF > 1)
// {
// estimateDupNum += (RF - 1);
// }
// if (RR > 1)
// {
// estimateDupNum += (RR - 1);
// }
// }
// cout << "Estimate dup num: " << estimateDupNum << endl;
// cout << "Finally unpaired read num: " << p.umReadEnds.size() << endl;
int dupReadNum = 0;
for (auto &ele : p.msPairDupIdx)
dupReadNum += ele.second.size();
cout << "pair dulicate read num: " << dupReadNum << endl;
for (auto &ele : p.msFragDupIdx)
dupReadNum += ele.second.size();
cout << "Finally dulicate read num: " << dupReadNum << endl;
// int pairNum = 0, fragNum = 0;
// for (auto &e : p.mvPair)
// pairNum += e.second.size();
// for (auto &e : p.mvFrag)
// fragNum += e.second.size();
// cout << "pair num: " << pairNum << endl;
// cout << "frag num: " << fragNum << endl;
}
static void parallelMarkDups()
{
// /* 读取缓存初始化 */
BamBufType inBamBuf(g_gArg.use_asyncio);
inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem);
/* 循环读入信息,并处理 */
g_maxJobNum = g_gArg.num_threads * 10;
// g_maxJobNum = g_gArg.num_threads * 3;
g_jobNumForRead = g_gArg.num_threads * 2;
int64_t x_all = 0; // for test
int64_t jobSeq = 0;
int64_t processedBamNum = 0; // 记录每个轮次累计处理的reads数量用来计算每个read在整个文件中的索引位置
threadpool thpool;
thread *mergeth;
if (g_gArg.num_threads == 1)
{
g_serialProcess = true;
}
else
{
thpool = thpool_init(g_gArg.num_threads); // 创建mark dup所需的线程池
mergeth = LAUNCH(thread_merge, nullptr); // 启动处理结果的的线程,合并所有线程的结果
}
const int minReadNum = 1000000; // 最小并行处理的read数量
int bamRemainSize = 0; // 上一轮还剩下的bam数量包含已经在任务里的和没有放进任务的
int numReadsForEachJob = 0; // 每个线程处理的read数量第一次读取的时候进行设置
int lastRoundUnProcessed = 0; // 上一轮没有放进任务里的read数量
int curRoundProcessed = 0; // 这一轮放进任务的read数量
while (inBamBuf.ReadStat() >= 0)
{
/* 读取bam文件中的read */
size_t readNum = inBamBuf.ReadBam();
cout << readNum << endl; // 这一轮读取的bam数量
// if (readNum <= minReadNum)
// {
// serialProcess = true;
// // 调用串行运行代码
// serialProcessData(inBamBuf);
// break;
// }
if (g_serialProcess)
{
tm_arr[0].acc_start();
serialProcessData(inBamBuf);
tm_arr[0].acc_end();
inBamBuf.ClearAll();
continue;
}
if (numReadsForEachJob == 0)
numReadsForEachJob = readNum / g_maxJobNum; // 第一次读取bam的时候进行设置
g_bamLoadedNum += readNum;
/* 多线程处理 任务数是线程数的10倍 */
curRoundProcessed = 0; // 当前轮次已经处理的reads数量
int numNeedToProcess = inBamBuf.Size() - bamRemainSize + lastRoundUnProcessed; // 当前需要处理的bam数量
for (int i = 0; numNeedToProcess >= numReadsForEachJob; ++i) // 只有待处理的reads数量大于一次任务的数量时新建任务
{
int startIdx = i * numReadsForEachJob + bamRemainSize - lastRoundUnProcessed;
int endIdx = (i + 1) * numReadsForEachJob + bamRemainSize - lastRoundUnProcessed;
ThMarkDupArg *thArg = new ThMarkDupArg({processedBamNum + curRoundProcessed,
jobSeq++,
true,
false,
inBamBuf.Slice(startIdx, endIdx)});
POSSESS(g_queueFirstLock); // 加锁
g_qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列
RELEASE(g_queueFirstLock); // 解锁
thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务
curRoundProcessed += endIdx - startIdx;
numNeedToProcess -= numReadsForEachJob;
}
processedBamNum += curRoundProcessed;
lastRoundUnProcessed = numNeedToProcess;
/* 等待可以继续读取的信号 */
POSSESS(g_readyToReadLock);
WAIT_FOR(g_readyToReadLock, TO_BE, 1);
bamRemainSize = g_bamLoadedNum - g_bamUsedNum;
while (bamRemainSize >= inBamBuf.Size() / 2)
{ // 要保留的多于现在有的bam数量的一半那就等待write线程继续处理
TWIST(g_readyToReadLock, TO, 0);
POSSESS(g_readyToReadLock);
WAIT_FOR(g_readyToReadLock, TO_BE, 1);
bamRemainSize = g_bamLoadedNum - g_bamUsedNum;
}
inBamBuf.ClearBeforeIdx(inBamBuf.Size() - bamRemainSize); // 清理掉已经处理完的reads
// cout << g_bamLoadedNum << '\t' << g_bamUsedNum << '\t' << bamRemainSize << '\t' << inBamBuf.Size() << endl;
TWIST(g_readyToReadLock, TO, 0);
}
if (!g_serialProcess)
{
/* 数据读完了放一个空的任务好让write thread停下来 */
ThMarkDupArg *thArg = nullptr;
if (lastRoundUnProcessed > 0) // 最后一轮还有没有添加进任务的read数据
{
thArg = new ThMarkDupArg({processedBamNum + curRoundProcessed, jobSeq++, false, false,
inBamBuf.Slice(inBamBuf.Size() - lastRoundUnProcessed, inBamBuf.Size())});
processedBamNum += lastRoundUnProcessed;
}
else
{
thArg = new ThMarkDupArg({0, jobSeq++, false, false});
}
POSSESS(g_queueFirstLock); // 加锁
g_qpThMarkDupArg.push(thArg); // 将新任务需要的参数添加到队列
RELEASE(g_queueFirstLock); // 解锁
thpool_add_work(thpool, thread_markdups, (void *)thArg); // 添加新任务
/* 同步所有线程 */
thpool_wait(thpool);
thpool_destroy(thpool);
JOIN(mergeth);
}
inBamBuf.FreeMemory();
cout << "x_all: " << x_all << endl;
cout << "loaded: " << g_bamLoadedNum << endl;
cout << " used: " << g_bamUsedNum << endl;
cout << "processedBamNum: " << processedBamNum << endl;
}