|
|
|
@ -74,17 +74,20 @@ static void markDupsForPairs(vector<const ReadEnds *> &vpRe, DPSet<DupInfo> *dup
|
|
|
|
// 这个统计可能会有误差,因为当前位点可能还有没匹配上的read,导致当前位点的read(paired)数量为1
|
|
|
|
// 这个统计可能会有误差,因为当前位点可能还有没匹配上的read,导致当前位点的read(paired)数量为1
|
|
|
|
// 可以通过后续的补充计算来解决这个问题,有必要么?好像没必要
|
|
|
|
// 可以通过后续的补充计算来解决这个问题,有必要么?好像没必要
|
|
|
|
// gMetrics.singletonReads.insert(vpRe[0]->read1IndexInFile);
|
|
|
|
// gMetrics.singletonReads.insert(vpRe[0]->read1IndexInFile);
|
|
|
|
singletonIdx->insert(vpRe[0]->read1IndexInFile);
|
|
|
|
// singletonIdx->insert(vpRe[0]->read1IndexInFile);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return;
|
|
|
|
return;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
for (auto pe : vpRe) {
|
|
|
|
for (auto pe : vpRe) {
|
|
|
|
// gMetrics.singletonReads.erase(pe->read1IndexInFile);
|
|
|
|
// gMetrics.singletonReads.erase(pe->read1IndexInFile);
|
|
|
|
|
|
|
|
if (pe->read1IndexInFile == 1208593132 || pe->read2IndexInFile == 1208593132) {
|
|
|
|
|
|
|
|
cout << 1208593132 << "\t" << (notDupIdx == nullptr) << endl;
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
if (notSingletonIdx != nullptr) {
|
|
|
|
if (notSingletonIdx != nullptr) {
|
|
|
|
for (auto pe : vpRe) {
|
|
|
|
for (auto pe : vpRe) {
|
|
|
|
notSingletonIdx->insert(pe->read1IndexInFile);
|
|
|
|
// notSingletonIdx->insert(pe->read1IndexInFile);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@ -324,7 +327,7 @@ static void generateReadEnds(ParallelDataArg &pdArg, MarkDupDataArg *arg) {
|
|
|
|
|
|
|
|
|
|
|
|
pdArg.mdArg = arg;
|
|
|
|
pdArg.mdArg = arg;
|
|
|
|
tm_arr[8].acc_start();
|
|
|
|
tm_arr[8].acc_start();
|
|
|
|
kt_for(g_gArg.num_threads, threadGenerateReadEnds, &pdArg, g_gArg.num_threads);
|
|
|
|
kt_for(g_gArg.num_threads / 2, threadGenerateReadEnds, &pdArg, g_gArg.num_threads);
|
|
|
|
// tm_arr[8].acc_end();
|
|
|
|
// tm_arr[8].acc_end();
|
|
|
|
// 合并各个线程数据
|
|
|
|
// 合并各个线程数据
|
|
|
|
// 查找未匹配的frags
|
|
|
|
// 查找未匹配的frags
|
|
|
|
@ -948,7 +951,6 @@ static void handleLastTask(MarkDupDataArg *task, GlobalDataArg *gDataArg) {
|
|
|
|
auto &vSingletonIdx = g.singletonIdxArr.back();
|
|
|
|
auto &vSingletonIdx = g.singletonIdxArr.back();
|
|
|
|
vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end());
|
|
|
|
vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end());
|
|
|
|
std::sort(vSingletonIdx.begin(), vSingletonIdx.end());
|
|
|
|
std::sort(vSingletonIdx.begin(), vSingletonIdx.end());
|
|
|
|
gSingletonNum += lp.pairSingletonIdx.size();
|
|
|
|
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
// for step 2 generate read ends
|
|
|
|
// for step 2 generate read ends
|
|
|
|
@ -957,7 +959,7 @@ static void handleLastTask(MarkDupDataArg *task, GlobalDataArg *gDataArg) {
|
|
|
|
static void mtGenerateReadEnds(void *data, long idx, int tid) {
|
|
|
|
static void mtGenerateReadEnds(void *data, long idx, int tid) {
|
|
|
|
auto &p = *(PipelineArg *)data;
|
|
|
|
auto &p = *(PipelineArg *)data;
|
|
|
|
auto &rnParser = g_vRnParser[idx];
|
|
|
|
auto &rnParser = g_vRnParser[idx];
|
|
|
|
int nThread = g_gArg.num_threads;
|
|
|
|
int nThread = p.genREThreadNum;
|
|
|
|
auto &bams = p.readData.bams;
|
|
|
|
auto &bams = p.readData.bams;
|
|
|
|
int64_t bamStartIdx = p.readData.bamStartIdx;
|
|
|
|
int64_t bamStartIdx = p.readData.bamStartIdx;
|
|
|
|
int64_t taskSeq = p.readData.taskSeq;
|
|
|
|
int64_t taskSeq = p.readData.taskSeq;
|
|
|
|
@ -1002,101 +1004,116 @@ static void mtGenerateReadEnds(void *data, long idx, int tid) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static void doGenRE(PipelineArg &pipeArg) {
|
|
|
|
static void doGenRE(PipelineArg &pipeArg) {
|
|
|
|
|
|
|
|
// return;
|
|
|
|
GenREData &genREData = pipeArg.genREData[pipeArg.genREOrder % pipeArg.GENBUFNUM];
|
|
|
|
GenREData &genREData = pipeArg.genREData[pipeArg.genREOrder % pipeArg.GENBUFNUM];
|
|
|
|
ReadData &readData = pipeArg.readData;
|
|
|
|
ReadData &readData = pipeArg.readData;
|
|
|
|
// 用来在genRE阶段计算一些Sort阶段应该计算的数据,保持每个step用时更平衡
|
|
|
|
|
|
|
|
SortData &sortData = pipeArg.sortData[pipeArg.genREOrder % pipeArg.SORTBUFNUM];
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// cout << "dogen-gen buf : " << pipeArg.genREOrder % pipeArg.GENBUFNUM << endl;
|
|
|
|
// cout << "dogen-gen buf : " << pipeArg.genREOrder % pipeArg.GENBUFNUM << endl;
|
|
|
|
// cout << "dogen-sort buf:" << pipeArg.genREOrder % pipeArg.SORTBUFNUM << endl;
|
|
|
|
// cout << "dogen-sort buf:" << pipeArg.genREOrder % pipeArg.SORTBUFNUM << endl;
|
|
|
|
|
|
|
|
|
|
|
|
// sortData.pSmd->unpairedDic.clear();
|
|
|
|
|
|
|
|
// sortData.pSmd->unpairedPosArr.clear();
|
|
|
|
|
|
|
|
// generate read ends
|
|
|
|
// generate read ends
|
|
|
|
kt_for(g_gArg.num_threads, mtGenerateReadEnds, &pipeArg, g_gArg.num_threads);
|
|
|
|
const int numThread = pipeArg.genREThreadNum;
|
|
|
|
|
|
|
|
// / 4;
|
|
|
|
|
|
|
|
kt_for(numThread, mtGenerateReadEnds, &pipeArg, numThread);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#if 1
|
|
|
|
|
|
|
|
// 用来在genRE阶段计算一些Sort阶段应该计算的数据,保持每个step用时更平衡
|
|
|
|
// 轮询每个线程中未找到匹配的read,找到匹配的
|
|
|
|
// 轮询每个线程中未找到匹配的read,找到匹配的
|
|
|
|
// vector<ReadEnds> &pairs = genREData.pairsArr[g_gArg.num_threads];
|
|
|
|
genREData.unpairedDic.clear();
|
|
|
|
// pairs.clear();
|
|
|
|
genREData.unpairedPosArr.clear();
|
|
|
|
// for (int i = 0; i < g_gArg.num_threads; ++i) {
|
|
|
|
|
|
|
|
// for (auto &val : genREData.unpairedDicArr[i]) {
|
|
|
|
|
|
|
|
// const string &key = val.first;
|
|
|
|
|
|
|
|
// const ReadEnds &fragEnd = val.second.unpairedRE;
|
|
|
|
|
|
|
|
// if (sortData.pSmd->unpairedDic.find(key) == sortData.pSmd->unpairedDic.end()) {
|
|
|
|
|
|
|
|
// sortData.pSmd->unpairedDic[key] = {readData.taskSeq, fragEnd};
|
|
|
|
|
|
|
|
// } else { // 找到了pairend
|
|
|
|
|
|
|
|
// auto &pairedEnds = sortData.pSmd->unpairedDic.at(key).unpairedRE;
|
|
|
|
|
|
|
|
// modifyPairedEnds(fragEnd, &pairedEnds);
|
|
|
|
|
|
|
|
// pairs.push_back(pairedEnds);
|
|
|
|
|
|
|
|
// sortData.pSmd->unpairedDic.erase(key); // 删除找到的pairend
|
|
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
// sort(pairs.begin(), pairs.end());
|
|
|
|
|
|
|
|
//
|
|
|
|
|
|
|
|
// // create unpaired info
|
|
|
|
|
|
|
|
// for (auto &e : sortData.pSmd->unpairedDic) {
|
|
|
|
|
|
|
|
// auto posKey = e.second.unpairedRE.posKey;
|
|
|
|
|
|
|
|
// auto &unpairArrInfo = sortData.pSmd->unpairedPosArr[posKey];
|
|
|
|
|
|
|
|
// unpairArrInfo.unpairedNum++;
|
|
|
|
|
|
|
|
// unpairArrInfo.taskSeq = readData.taskSeq;
|
|
|
|
|
|
|
|
// unpairArrInfo.readNameSet.insert(e.first);
|
|
|
|
|
|
|
|
// }
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// end for step 2 generate read ends
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// for step-3 sort
|
|
|
|
|
|
|
|
static void doSort(PipelineArg &pipeArg) {
|
|
|
|
|
|
|
|
GenREData &genREData = pipeArg.genREData[pipeArg.sortOrder % pipeArg.GENBUFNUM];
|
|
|
|
|
|
|
|
SortData &sortData = pipeArg.sortData[pipeArg.sortOrder % pipeArg.SORTBUFNUM];
|
|
|
|
|
|
|
|
// cout << "dosort-gen buf : " << pipeArg.sortOrder % pipeArg.GENBUFNUM << endl;
|
|
|
|
|
|
|
|
// cout << "dosort-sort buf:" << pipeArg.sortOrder % pipeArg.SORTBUFNUM << endl;
|
|
|
|
|
|
|
|
auto pSmd = (SortMarkData*) sortData.pSmd;
|
|
|
|
|
|
|
|
pSmd->pairs.clear();
|
|
|
|
|
|
|
|
pSmd->frags.clear();
|
|
|
|
|
|
|
|
pSmd->unpairedDic.clear();
|
|
|
|
|
|
|
|
pSmd->unpairedPosArr.clear();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
vector<ReadEnds> &pairs = genREData.pairsArr[g_gArg.num_threads];
|
|
|
|
vector<ReadEnds> &pairs = genREData.pairsArr[g_gArg.num_threads];
|
|
|
|
pairs.clear();
|
|
|
|
pairs.clear();
|
|
|
|
for (int i = 0; i < g_gArg.num_threads; ++i) {
|
|
|
|
for (int i = 0; i < g_gArg.num_threads; ++i) {
|
|
|
|
for (auto &val : genREData.unpairedDicArr[i]) {
|
|
|
|
for (auto &val : genREData.unpairedDicArr[i]) {
|
|
|
|
const string &key = val.first;
|
|
|
|
const string &key = val.first;
|
|
|
|
const ReadEnds &fragEnd = val.second.unpairedRE;
|
|
|
|
const ReadEnds &fragEnd = val.second.unpairedRE;
|
|
|
|
if (pSmd->unpairedDic.find(key) == pSmd->unpairedDic.end()) {
|
|
|
|
if (genREData.unpairedDic.find(key) == genREData.unpairedDic.end()) {
|
|
|
|
pSmd->unpairedDic[key] = {(int64_t)pipeArg.sortOrder, fragEnd};
|
|
|
|
genREData.unpairedDic[key] = {readData.taskSeq, fragEnd};
|
|
|
|
} else { // 找到了pairend
|
|
|
|
} else { // 找到了pairend
|
|
|
|
auto &pairedEnds = pSmd->unpairedDic.at(key).unpairedRE;
|
|
|
|
auto &pairedEnds = genREData.unpairedDic.at(key).unpairedRE;
|
|
|
|
modifyPairedEnds(fragEnd, &pairedEnds);
|
|
|
|
modifyPairedEnds(fragEnd, &pairedEnds);
|
|
|
|
pairs.push_back(pairedEnds);
|
|
|
|
pairs.push_back(pairedEnds);
|
|
|
|
pSmd->unpairedDic.erase(key); // 删除找到的pairend
|
|
|
|
genREData.unpairedDic.erase(key); // 删除找到的pairend
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
sort(pairs.begin(), pairs.end());
|
|
|
|
sort(pairs.begin(), pairs.end());
|
|
|
|
|
|
|
|
|
|
|
|
// create unpaired info
|
|
|
|
// create unpaired info
|
|
|
|
for (auto &e : pSmd->unpairedDic) {
|
|
|
|
for (auto &e : genREData.unpairedDic) {
|
|
|
|
auto posKey = e.second.unpairedRE.posKey;
|
|
|
|
auto posKey = e.second.unpairedRE.posKey;
|
|
|
|
auto &unpairArrInfo = pSmd->unpairedPosArr[posKey];
|
|
|
|
auto &unpairArrInfo = genREData.unpairedPosArr[posKey];
|
|
|
|
|
|
|
|
unpairArrInfo.unpairedNum++;
|
|
|
|
|
|
|
|
unpairArrInfo.taskSeq = readData.taskSeq;
|
|
|
|
|
|
|
|
unpairArrInfo.readNameSet.insert(e.first);
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// end for step 2 generate read ends
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// for step-3 sort
|
|
|
|
|
|
|
|
static void doSort(PipelineArg &pipeArg) {
|
|
|
|
|
|
|
|
// return;
|
|
|
|
|
|
|
|
GenREData &genREData = pipeArg.genREData[pipeArg.sortOrder % pipeArg.GENBUFNUM];
|
|
|
|
|
|
|
|
SortData &sortData = pipeArg.sortData[pipeArg.sortOrder % pipeArg.SORTBUFNUM];
|
|
|
|
|
|
|
|
// cout << "dosort-gen buf : " << pipeArg.sortOrder % pipeArg.GENBUFNUM << endl;
|
|
|
|
|
|
|
|
// cout << "dosort-sort buf:" << pipeArg.sortOrder % pipeArg.SORTBUFNUM << endl;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#if 1
|
|
|
|
|
|
|
|
sortData.CopyGenREData(genREData);
|
|
|
|
|
|
|
|
#else
|
|
|
|
|
|
|
|
sortData.unpairedDic.clear();
|
|
|
|
|
|
|
|
sortData.unpairedPosArr.clear();
|
|
|
|
|
|
|
|
const int numThread = pipeArg.genREThreadNum;
|
|
|
|
|
|
|
|
// / 4;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
vector<ReadEnds> &pairs = genREData.pairsArr[numThread];
|
|
|
|
|
|
|
|
pairs.clear();
|
|
|
|
|
|
|
|
for (int i = 0; i < numThread; ++i) {
|
|
|
|
|
|
|
|
for (auto &val : genREData.unpairedDicArr[i]) {
|
|
|
|
|
|
|
|
const string &key = val.first;
|
|
|
|
|
|
|
|
const ReadEnds &fragEnd = val.second.unpairedRE;
|
|
|
|
|
|
|
|
if (sortData.unpairedDic.find(key) == sortData.unpairedDic.end()) {
|
|
|
|
|
|
|
|
sortData.unpairedDic[key] = {(int64_t)pipeArg.sortOrder, fragEnd};
|
|
|
|
|
|
|
|
} else { // 找到了pairend
|
|
|
|
|
|
|
|
auto &pairedEnds = sortData.unpairedDic.at(key).unpairedRE;
|
|
|
|
|
|
|
|
modifyPairedEnds(fragEnd, &pairedEnds);
|
|
|
|
|
|
|
|
pairs.push_back(pairedEnds);
|
|
|
|
|
|
|
|
sortData.unpairedDic.erase(key); // 删除找到的pairend
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
sort(pairs.begin(), pairs.end());
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// create unpaired info
|
|
|
|
|
|
|
|
for (auto &e : sortData.unpairedDic) {
|
|
|
|
|
|
|
|
auto posKey = e.second.unpairedRE.posKey;
|
|
|
|
|
|
|
|
auto &unpairArrInfo = sortData.unpairedPosArr[posKey];
|
|
|
|
unpairArrInfo.unpairedNum++;
|
|
|
|
unpairArrInfo.unpairedNum++;
|
|
|
|
unpairArrInfo.taskSeq = pipeArg.sortOrder;
|
|
|
|
unpairArrInfo.taskSeq = pipeArg.sortOrder;
|
|
|
|
unpairArrInfo.readNameSet.insert(e.first);
|
|
|
|
unpairArrInfo.readNameSet.insert(e.first);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
sortData.pairs.clear();
|
|
|
|
|
|
|
|
sortData.frags.clear();
|
|
|
|
const ReadEnds *pRE;
|
|
|
|
const ReadEnds *pRE;
|
|
|
|
ReadEndsHeap pairsHeap, fragsHeap;
|
|
|
|
ReadEndsHeap pairsHeap, fragsHeap;
|
|
|
|
pairsHeap.Init(&genREData.pairsArr);
|
|
|
|
pairsHeap.Init(&genREData.pairsArr);
|
|
|
|
while ((pRE = pairsHeap.Pop()) != nullptr) {
|
|
|
|
while ((pRE = pairsHeap.Pop()) != nullptr) {
|
|
|
|
pSmd->pairs.push_back(*pRE);
|
|
|
|
sortData.pairs.push_back(*pRE);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
fragsHeap.Init(&genREData.fragsArr);
|
|
|
|
fragsHeap.Init(&genREData.fragsArr);
|
|
|
|
while ((pRE = fragsHeap.Pop()) != nullptr) {
|
|
|
|
while ((pRE = fragsHeap.Pop()) != nullptr) {
|
|
|
|
pSmd->frags.push_back(*pRE);
|
|
|
|
sortData.frags.push_back(*pRE);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// for step-4 sort
|
|
|
|
// for step-4 sort
|
|
|
|
static void doMarkDup(PipelineArg &pipeArg) {
|
|
|
|
static void doMarkDup(PipelineArg &pipeArg) {
|
|
|
|
|
|
|
|
// return;
|
|
|
|
MarkDupData &mdData = pipeArg.markDupData[pipeArg.markDupOrder % pipeArg.MARKBUFNUM];
|
|
|
|
MarkDupData &mdData = pipeArg.markDupData[pipeArg.markDupOrder % pipeArg.MARKBUFNUM];
|
|
|
|
SortData &sortData = pipeArg.sortData[pipeArg.markDupOrder % pipeArg.SORTBUFNUM];
|
|
|
|
SortData &sortData = pipeArg.sortData[pipeArg.markDupOrder % pipeArg.SORTBUFNUM];
|
|
|
|
mdData.taskSeq = pipeArg.markDupOrder;
|
|
|
|
mdData.taskSeq = pipeArg.markDupOrder;
|
|
|
|
@ -1106,29 +1123,15 @@ static void doMarkDup(PipelineArg &pipeArg) {
|
|
|
|
mdData.pairRepIdx.clear();
|
|
|
|
mdData.pairRepIdx.clear();
|
|
|
|
mdData.pairSingletonIdx.clear();
|
|
|
|
mdData.pairSingletonIdx.clear();
|
|
|
|
|
|
|
|
|
|
|
|
//cout << "doMarkDup-mark buf : " << pipeArg.markDupOrder % pipeArg.MARKBUFNUM << endl;
|
|
|
|
|
|
|
|
//cout << "doMarkDup-sort buf:" << pipeArg.markDupOrder % pipeArg.SORTBUFNUM << endl;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tm_arr[5].acc_start();
|
|
|
|
tm_arr[5].acc_start();
|
|
|
|
// auto tmpPt = sortData.pSmd;
|
|
|
|
mdData.CopySortData(sortData);
|
|
|
|
// sortData.pSmd = mdData.pSmd;
|
|
|
|
|
|
|
|
// mdData.pSmd = tmpPt;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
auto mdpSmd = (SortMarkData *)mdData.pSmd;
|
|
|
|
|
|
|
|
auto stpSmd = (SortMarkData *)sortData.pSmd;
|
|
|
|
|
|
|
|
mdpSmd->frags = stpSmd->frags;
|
|
|
|
|
|
|
|
mdpSmd->pairs = stpSmd->pairs;
|
|
|
|
|
|
|
|
mdpSmd->unpairedDic = stpSmd->unpairedDic;
|
|
|
|
|
|
|
|
mdpSmd->unpairedPosArr = stpSmd->unpairedPosArr;
|
|
|
|
|
|
|
|
//cout << mdData.pSmd->frags.size() << "\t" << mdData.pSmd->pairs.size() << "\t" << mdData.pSmd->unpairedDic.size()
|
|
|
|
|
|
|
|
// << "\t" << mdData.pSmd->unpairedPosArr.size() << endl;
|
|
|
|
|
|
|
|
tm_arr[5].acc_end();
|
|
|
|
tm_arr[5].acc_end();
|
|
|
|
|
|
|
|
|
|
|
|
// 先处理 pair
|
|
|
|
// 先处理 pair
|
|
|
|
processPairs(mdpSmd->pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx,
|
|
|
|
processPairs(mdData.pairs, &mdData.pairDupIdx, &mdData.pairOpticalDupIdx, &mdData.pairRepIdx,
|
|
|
|
&mdData.pairSingletonIdx);
|
|
|
|
&mdData.pairSingletonIdx);
|
|
|
|
// 再处理frag
|
|
|
|
// 再处理frag
|
|
|
|
processFrags(mdpSmd->frags, &mdData.fragDupIdx);
|
|
|
|
processFrags(mdData.frags, &mdData.fragDupIdx);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
template <typename T>
|
|
|
|
template <typename T>
|
|
|
|
@ -1151,12 +1154,13 @@ static void refreshMarkDupData(DPSet<DupInfo> &dupIdx, MDSet<int64_t> &opticalDu
|
|
|
|
MDSet<int64_t> ¬OpticalDupIdx, MDSet<int64_t> ¬RepIdx,
|
|
|
|
MDSet<int64_t> ¬OpticalDupIdx, MDSet<int64_t> ¬RepIdx,
|
|
|
|
MDSet<int64_t> ¬SingletonIdx, MarkDupData &lp, MarkDupData &p) {
|
|
|
|
MDSet<int64_t> ¬SingletonIdx, MarkDupData &lp, MarkDupData &p) {
|
|
|
|
refreshDupIdx(dupIdx, lp.pairDupIdx, p.pairDupIdx);
|
|
|
|
refreshDupIdx(dupIdx, lp.pairDupIdx, p.pairDupIdx);
|
|
|
|
refreshNotDupIdx(notDupIdx, lp.pairDupIdx, p.pairDupIdx);
|
|
|
|
|
|
|
|
refreshDupIdx(opticalDupIdx, lp.pairOpticalDupIdx, p.pairOpticalDupIdx);
|
|
|
|
refreshDupIdx(opticalDupIdx, lp.pairOpticalDupIdx, p.pairOpticalDupIdx);
|
|
|
|
refreshNotDupIdx(notOpticalDupIdx, lp.pairOpticalDupIdx, p.pairOpticalDupIdx);
|
|
|
|
|
|
|
|
refreshDupIdx(repIdx, lp.pairRepIdx, p.pairRepIdx);
|
|
|
|
refreshDupIdx(repIdx, lp.pairRepIdx, p.pairRepIdx);
|
|
|
|
refreshNotDupIdx(notRepIdx, lp.pairRepIdx, p.pairRepIdx);
|
|
|
|
|
|
|
|
refreshDupIdx(singletonIdx, lp.pairSingletonIdx, p.pairSingletonIdx);
|
|
|
|
refreshDupIdx(singletonIdx, lp.pairSingletonIdx, p.pairSingletonIdx);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
refreshNotDupIdx(notDupIdx, lp.pairDupIdx, p.pairDupIdx);
|
|
|
|
|
|
|
|
refreshNotDupIdx(notOpticalDupIdx, lp.pairOpticalDupIdx, p.pairOpticalDupIdx);
|
|
|
|
|
|
|
|
refreshNotDupIdx(notRepIdx, lp.pairRepIdx, p.pairRepIdx);
|
|
|
|
refreshNotDupIdx(notSingletonIdx, lp.pairSingletonIdx, p.pairSingletonIdx);
|
|
|
|
refreshNotDupIdx(notSingletonIdx, lp.pairSingletonIdx, p.pairSingletonIdx);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
@ -1169,10 +1173,6 @@ static void doIntersect(PipelineArg &pipeArg) {
|
|
|
|
MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM];
|
|
|
|
MarkDupData &lp = pipeArg.markDupData[(pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM];
|
|
|
|
MarkDupData &p = pipeArg.markDupData[pipeArg.intersectOrder % pipeArg.MARKBUFNUM];
|
|
|
|
MarkDupData &p = pipeArg.markDupData[pipeArg.intersectOrder % pipeArg.MARKBUFNUM];
|
|
|
|
|
|
|
|
|
|
|
|
// cout << "dointer-mark buf : " << (pipeArg.intersectOrder - 1) % pipeArg.MARKBUFNUM << "\t"
|
|
|
|
|
|
|
|
// << pipeArg.intersectOrder % pipeArg.MARKBUFNUM << endl;
|
|
|
|
|
|
|
|
// cout << "sort buf:" << pipeArg.sortOrder % pipeArg.SORTBUFNUM << endl;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
vector<ReadEnds> reArr;
|
|
|
|
vector<ReadEnds> reArr;
|
|
|
|
DPSet<DupInfo> dupIdx;
|
|
|
|
DPSet<DupInfo> dupIdx;
|
|
|
|
MDSet<int64_t> opticalDupIdx;
|
|
|
|
MDSet<int64_t> opticalDupIdx;
|
|
|
|
@ -1182,12 +1182,10 @@ static void doIntersect(PipelineArg &pipeArg) {
|
|
|
|
MDSet<int64_t> notDupIdx;
|
|
|
|
MDSet<int64_t> notDupIdx;
|
|
|
|
MDSet<int64_t> notRepIdx;
|
|
|
|
MDSet<int64_t> notRepIdx;
|
|
|
|
MDSet<int64_t> notSingletonIdx;
|
|
|
|
MDSet<int64_t> notSingletonIdx;
|
|
|
|
auto lppSmd = (SortMarkData *)lp.pSmd;
|
|
|
|
|
|
|
|
auto ppSmd = (SortMarkData *)p.pSmd;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 先处理重叠的frags
|
|
|
|
// 先处理重叠的frags
|
|
|
|
// cout << "frags: " << lp.pSmd->frags.size() << "\t" << p.pSmd->frags.size() << endl;
|
|
|
|
// cout << "frags: " << lp.pSmd->frags.size() << "\t" << p.pSmd->frags.size() << endl;
|
|
|
|
getIntersectData(lppSmd->frags, ppSmd->frags, &reArr);
|
|
|
|
getIntersectData(lp.frags, p.frags, &reArr);
|
|
|
|
processFrags(reArr, &dupIdx, ¬DupIdx);
|
|
|
|
processFrags(reArr, &dupIdx, ¬DupIdx);
|
|
|
|
refreshDupIdx(dupIdx, lp.fragDupIdx, p.fragDupIdx);
|
|
|
|
refreshDupIdx(dupIdx, lp.fragDupIdx, p.fragDupIdx);
|
|
|
|
refreshNotDupIdx(notDupIdx, lp.fragDupIdx, p.fragDupIdx);
|
|
|
|
refreshNotDupIdx(notDupIdx, lp.fragDupIdx, p.fragDupIdx);
|
|
|
|
@ -1197,7 +1195,7 @@ static void doIntersect(PipelineArg &pipeArg) {
|
|
|
|
reArr.clear();
|
|
|
|
reArr.clear();
|
|
|
|
dupIdx.clear();
|
|
|
|
dupIdx.clear();
|
|
|
|
notDupIdx.clear();
|
|
|
|
notDupIdx.clear();
|
|
|
|
getIntersectData(lppSmd->pairs, ppSmd->pairs, &reArr, true);
|
|
|
|
getIntersectData(lp.pairs, p.pairs, &reArr, true);
|
|
|
|
// cout << "pairs size: " << lp.pSmd->pairs.size() << "\t" << p.pSmd->pairs.size() << endl;
|
|
|
|
// cout << "pairs size: " << lp.pSmd->pairs.size() << "\t" << p.pSmd->pairs.size() << endl;
|
|
|
|
// cout << "range: " << lp.pSmd->pairs.front().posKey << "\t" << lp.pSmd->pairs.back().posKey << "\t"
|
|
|
|
// cout << "range: " << lp.pSmd->pairs.front().posKey << "\t" << lp.pSmd->pairs.back().posKey << "\t"
|
|
|
|
// << p.pSmd->pairs.front().posKey << "\t" << p.pSmd->pairs.back().posKey << endl;
|
|
|
|
// << p.pSmd->pairs.front().posKey << "\t" << p.pSmd->pairs.back().posKey << endl;
|
|
|
|
@ -1206,24 +1204,23 @@ static void doIntersect(PipelineArg &pipeArg) {
|
|
|
|
¬SingletonIdx);
|
|
|
|
¬SingletonIdx);
|
|
|
|
refreshMarkDupData(dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx,
|
|
|
|
refreshMarkDupData(dupIdx, opticalDupIdx, repIdx, singletonIdx, notDupIdx, notOpticalDupIdx, notRepIdx,
|
|
|
|
notSingletonIdx, lp, p);
|
|
|
|
notSingletonIdx, lp, p);
|
|
|
|
/*
|
|
|
|
|
|
|
|
// return;
|
|
|
|
// return;
|
|
|
|
// 处理之前未匹配的部分
|
|
|
|
// 处理之前未匹配的部分
|
|
|
|
map<CalcKey, int64_t> recalcPos;
|
|
|
|
map<CalcKey, int64_t> recalcPos;
|
|
|
|
CalcSet<CalcKey> alreadyAdd; // 与该位点相同的pair都添加到数组里了
|
|
|
|
CalcSet<CalcKey> alreadyAdd; // 与该位点相同的pair都添加到数组里了
|
|
|
|
MDSet<int64_t> addToGlobal;
|
|
|
|
MDSet<int64_t> addToGlobal;
|
|
|
|
int64_t prevLastPos = 0, nextFirstPos = 0;
|
|
|
|
int64_t prevLastPos = 0, nextFirstPos = 0;
|
|
|
|
if (lppSmd->frags.size() > 0)
|
|
|
|
if (lp.frags.size() > 0) prevLastPos = lp.frags.back().posKey;
|
|
|
|
prevLastPos = lppSmd->frags.back().posKey;
|
|
|
|
if (p.frags.size() > 0) nextFirstPos = p.frags[0].posKey;
|
|
|
|
if (ppSmd->frags.size() > 0)
|
|
|
|
//cout << "dic size: " << lp.unpairedDic.size() << "\t" << p.unpairedDic.size() << "\t" << g.unpairedDic.size()
|
|
|
|
nextFirstPos = ppSmd->frags[0].posKey;
|
|
|
|
// << endl;
|
|
|
|
for (auto &prevUnpair : lppSmd->unpairedDic) { // 遍历上一个任务中的每个未匹配的read
|
|
|
|
for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
|
|
|
|
auto &readName = prevUnpair.first;
|
|
|
|
auto &readName = prevUnpair.first;
|
|
|
|
auto &prevPosInfo = prevUnpair.second;
|
|
|
|
auto &prevPosInfo = prevUnpair.second;
|
|
|
|
auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end
|
|
|
|
auto prevFragEnd = prevPosInfo.unpairedRE; // 未匹配的read end
|
|
|
|
|
|
|
|
if (p.unpairedDic.find(readName) != p.unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read
|
|
|
|
if (ppSmd->unpairedDic.find(readName) != ppSmd->unpairedDic.end()) { // 在当前这个任务里找到了这个未匹配的read
|
|
|
|
auto &nextPosInfo = p.unpairedDic[readName];
|
|
|
|
auto &nextPosInfo = ppSmd->unpairedDic[readName];
|
|
|
|
|
|
|
|
auto &nextFragEnd = nextPosInfo.unpairedRE;
|
|
|
|
auto &nextFragEnd = nextPosInfo.unpairedRE;
|
|
|
|
int64_t prevPosKey = prevFragEnd.posKey;
|
|
|
|
int64_t prevPosKey = prevFragEnd.posKey;
|
|
|
|
modifyPairedEnds(nextFragEnd, &prevFragEnd); // 在某些clip情况下,poskey可能是后面的read
|
|
|
|
modifyPairedEnds(nextFragEnd, &prevFragEnd); // 在某些clip情况下,poskey可能是后面的read
|
|
|
|
@ -1231,11 +1228,11 @@ static void doIntersect(PipelineArg &pipeArg) {
|
|
|
|
CalcKey ck = {prevPosKey, nextPosKey};
|
|
|
|
CalcKey ck = {prevPosKey, nextPosKey};
|
|
|
|
UnpairedPosInfo *prevUnpairInfoP = nullptr;
|
|
|
|
UnpairedPosInfo *prevUnpairInfoP = nullptr;
|
|
|
|
UnpairedPosInfo *nextUnpairInfoP = nullptr;
|
|
|
|
UnpairedPosInfo *nextUnpairInfoP = nullptr;
|
|
|
|
if (lppSmd->unpairedPosArr.find(prevPosKey) != lppSmd->unpairedPosArr.end())
|
|
|
|
|
|
|
|
prevUnpairInfoP = &lppSmd->unpairedPosArr[prevPosKey];
|
|
|
|
|
|
|
|
if (ppSmd->unpairedPosArr.find(prevPosKey) != ppSmd->unpairedPosArr.end())
|
|
|
|
|
|
|
|
nextUnpairInfoP = &ppSmd->unpairedPosArr[prevPosKey];
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (lp.unpairedPosArr.find(prevPosKey) != lp.unpairedPosArr.end())
|
|
|
|
|
|
|
|
prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey];
|
|
|
|
|
|
|
|
if (p.unpairedPosArr.find(prevPosKey) != p.unpairedPosArr.end())
|
|
|
|
|
|
|
|
nextUnpairInfoP = &p.unpairedPosArr[prevPosKey];
|
|
|
|
// pos分为两种情况,根据poskey(pair中两个read分别的pos)的位置确定
|
|
|
|
// pos分为两种情况,根据poskey(pair中两个read分别的pos)的位置确定
|
|
|
|
// 1.
|
|
|
|
// 1.
|
|
|
|
// prevpos在交叉部分之前,nextpos在交叉部分之后,这种情况不需要获取pairarr中的数据;
|
|
|
|
// prevpos在交叉部分之前,nextpos在交叉部分之后,这种情况不需要获取pairarr中的数据;
|
|
|
|
@ -1252,30 +1249,29 @@ static void doIntersect(PipelineArg &pipeArg) {
|
|
|
|
if (alreadyAdd.find(ck) != alreadyAdd.end()) {
|
|
|
|
if (alreadyAdd.find(ck) != alreadyAdd.end()) {
|
|
|
|
// 之前已经添加过了,后面就不用再添加数据了,因为同一个位置可能找到两个及以上的unpair数据,处理之前的数据时候可能已经添加了这些数据
|
|
|
|
// 之前已经添加过了,后面就不用再添加数据了,因为同一个位置可能找到两个及以上的unpair数据,处理之前的数据时候可能已经添加了这些数据
|
|
|
|
addDataToPos = false;
|
|
|
|
addDataToPos = false;
|
|
|
|
} else
|
|
|
|
} else alreadyAdd.insert(ck);
|
|
|
|
alreadyAdd.insert(ck);
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if (prevPosKey < nextFirstPos) { // prevpos在交叉部分之前
|
|
|
|
if (prevPosKey < nextFirstPos) { // prevpos在交叉部分之前
|
|
|
|
auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr
|
|
|
|
auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr
|
|
|
|
prevPairArr.push_back(prevFragEnd);
|
|
|
|
prevPairArr.push_back(prevFragEnd);
|
|
|
|
if (nextPosKey <= prevLastPos && addDataToPos) { // 第二种情况
|
|
|
|
if (nextPosKey <= prevLastPos && addDataToPos) { // 第二种情况
|
|
|
|
getEqualRE(prevFragEnd, lppSmd->pairs, &prevPairArr);
|
|
|
|
getEqualRE(prevFragEnd, lp.pairs, &prevPairArr);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// 第一种情况,第二种情况下都会出现,复杂情况一
|
|
|
|
// 第一种情况,第二种情况下都会出现,复杂情况一
|
|
|
|
auto gPosInfo = g.unpairedPosArr.find(prevPosKey);
|
|
|
|
auto gPosInfo = g.unpairedPosArr.find(prevPosKey);
|
|
|
|
if (gPosInfo != g.unpairedPosArr.end()) { // 可能g和p有匹配的,刚好和该位点一致
|
|
|
|
if (gPosInfo != g.unpairedPosArr.end()) { // 可能g和p有匹配的,刚好和该位点一致
|
|
|
|
auto &gUnpairInfo = gPosInfo->second;
|
|
|
|
auto &gUnpairInfo = gPosInfo->second;
|
|
|
|
auto pPosInfo = ppSmd->unpairedPosArr.find(nextPosKey);
|
|
|
|
auto pPosInfo = p.unpairedPosArr.find(nextPosKey);
|
|
|
|
if (pPosInfo != ppSmd->unpairedPosArr.end()) {
|
|
|
|
if (pPosInfo != p.unpairedPosArr.end()) {
|
|
|
|
auto &pUnpairInfo = pPosInfo->second;
|
|
|
|
auto &pUnpairInfo = pPosInfo->second;
|
|
|
|
for (auto &rn : gUnpairInfo.readNameSet) { // 遍历每一个readname,看是否有匹配的
|
|
|
|
for (auto &rn : gUnpairInfo.readNameSet) { // 遍历每一个readname,看是否有匹配的
|
|
|
|
if (pUnpairInfo.readNameSet.find(rn) != pUnpairInfo.readNameSet.end()) {
|
|
|
|
if (pUnpairInfo.readNameSet.find(rn) != pUnpairInfo.readNameSet.end()) {
|
|
|
|
auto pe = g.unpairedDic[rn].unpairedRE;
|
|
|
|
auto pe = g.unpairedDic[rn].unpairedRE;
|
|
|
|
auto fe = ppSmd->unpairedDic[rn].unpairedRE;
|
|
|
|
auto fe = p.unpairedDic[rn].unpairedRE;
|
|
|
|
modifyPairedEnds(fe, &pe);
|
|
|
|
modifyPairedEnds(fe, &pe);
|
|
|
|
prevPairArr.push_back(pe);
|
|
|
|
prevPairArr.push_back(pe);
|
|
|
|
g.unpairedDic.erase(rn);
|
|
|
|
g.unpairedDic.erase(rn);
|
|
|
|
ppSmd->unpairedDic.erase(rn);
|
|
|
|
p.unpairedDic.erase(rn);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
@ -1290,8 +1286,8 @@ static void doIntersect(PipelineArg &pipeArg) {
|
|
|
|
auto &prevPairArr = prevUnpairInfoP->pairArr;
|
|
|
|
auto &prevPairArr = prevUnpairInfoP->pairArr;
|
|
|
|
prevPairArr.push_back(prevFragEnd);
|
|
|
|
prevPairArr.push_back(prevFragEnd);
|
|
|
|
if (addDataToPos) {
|
|
|
|
if (addDataToPos) {
|
|
|
|
getEqualRE(prevFragEnd, ppSmd->pairs, &prevPairArr);
|
|
|
|
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
|
|
|
|
getEqualRE(prevFragEnd, ppSmd->pairs, &nextPairArr);
|
|
|
|
getEqualRE(prevFragEnd, p.pairs, &nextPairArr);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// 将数据放到next task里,(这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除)
|
|
|
|
// 将数据放到next task里,(这个位点以后会可能还会计算到,目前方案是都计算,只是把冗余剔除)
|
|
|
|
recalcPos[ck] = nextPosInfo.taskSeq;
|
|
|
|
recalcPos[ck] = nextPosInfo.taskSeq;
|
|
|
|
@ -1302,26 +1298,28 @@ static void doIntersect(PipelineArg &pipeArg) {
|
|
|
|
auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr
|
|
|
|
auto &prevPairArr = prevUnpairInfoP->pairArr; // prevUnpairInfoP肯定不是nullptr
|
|
|
|
prevPairArr.push_back(prevFragEnd);
|
|
|
|
prevPairArr.push_back(prevFragEnd);
|
|
|
|
if (addDataToPos) // 第二种情况
|
|
|
|
if (addDataToPos) // 第二种情况
|
|
|
|
getEqualRE(prevFragEnd, ppSmd->pairs, &prevPairArr);
|
|
|
|
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
|
|
|
|
recalcPos[ck] = prevPosInfo.taskSeq;
|
|
|
|
recalcPos[ck] = prevPosInfo.taskSeq;
|
|
|
|
std::sort(prevPairArr.begin(), prevPairArr.end());
|
|
|
|
std::sort(prevPairArr.begin(), prevPairArr.end());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
} else { // 第四种情况
|
|
|
|
} else { // 第四种情况
|
|
|
|
if (prevUnpairInfoP == nullptr) {
|
|
|
|
if (prevUnpairInfoP == nullptr) {
|
|
|
|
prevUnpairInfoP = &lppSmd->unpairedPosArr[prevPosKey];
|
|
|
|
prevUnpairInfoP = &lp.unpairedPosArr[prevPosKey];
|
|
|
|
prevUnpairInfoP->taskSeq = lp.taskSeq;
|
|
|
|
prevUnpairInfoP->taskSeq = lp.taskSeq;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
auto &prevPairArr = prevUnpairInfoP->pairArr;
|
|
|
|
auto &prevPairArr = prevUnpairInfoP->pairArr;
|
|
|
|
prevPairArr.push_back(prevFragEnd);
|
|
|
|
prevPairArr.push_back(prevFragEnd);
|
|
|
|
if (addDataToPos) {
|
|
|
|
if (addDataToPos) {
|
|
|
|
getEqualRE(prevFragEnd, lppSmd->pairs, &prevPairArr);
|
|
|
|
getEqualRE(prevFragEnd, lp.pairs, &prevPairArr);
|
|
|
|
getEqualRE(prevFragEnd, ppSmd->pairs, &prevPairArr);
|
|
|
|
getEqualRE(prevFragEnd, p.pairs, &prevPairArr);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
recalcPos[ck] = prevPosInfo.taskSeq;
|
|
|
|
recalcPos[ck] = prevPosInfo.taskSeq;
|
|
|
|
std::sort(prevPairArr.begin(), prevPairArr.end());
|
|
|
|
std::sort(prevPairArr.begin(), prevPairArr.end());
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ppSmd->unpairedDic.erase(readName); // 在next task里删除该read
|
|
|
|
p.unpairedDic.erase(readName); // 在next task里删除该read
|
|
|
|
|
|
|
|
// p.unpairedPosArr.erase(nextPosKey);
|
|
|
|
|
|
|
|
// p.unpairedPosArr[nextPosKey].
|
|
|
|
} else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read
|
|
|
|
} else if (g.unpairedDic.find(readName) != g.unpairedDic.end()) { // 在遗留数据中找到了匹配的read
|
|
|
|
auto &remainPosInfo = g.unpairedDic[readName];
|
|
|
|
auto &remainPosInfo = g.unpairedDic[readName];
|
|
|
|
auto remainFragEnd = remainPosInfo.unpairedRE;
|
|
|
|
auto remainFragEnd = remainPosInfo.unpairedRE;
|
|
|
|
@ -1341,6 +1339,7 @@ static void doIntersect(PipelineArg &pipeArg) {
|
|
|
|
addToGlobal.insert(prevPosKey);
|
|
|
|
addToGlobal.insert(prevPosKey);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// return;
|
|
|
|
map<int64_t, TaskSeqDupInfo> taskChanged;
|
|
|
|
map<int64_t, TaskSeqDupInfo> taskChanged;
|
|
|
|
MDSet<int64_t> posProcessed;
|
|
|
|
MDSet<int64_t> posProcessed;
|
|
|
|
for (auto &e : recalcPos) {
|
|
|
|
for (auto &e : recalcPos) {
|
|
|
|
@ -1355,7 +1354,7 @@ static void doIntersect(PipelineArg &pipeArg) {
|
|
|
|
if (taskSeq < lp.taskSeq)
|
|
|
|
if (taskSeq < lp.taskSeq)
|
|
|
|
pairArrP = &g.unpairedPosArr[posKey].pairArr;
|
|
|
|
pairArrP = &g.unpairedPosArr[posKey].pairArr;
|
|
|
|
else
|
|
|
|
else
|
|
|
|
pairArrP = &lppSmd->unpairedPosArr[posKey].pairArr;
|
|
|
|
pairArrP = &lp.unpairedPosArr[posKey].pairArr;
|
|
|
|
processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx,
|
|
|
|
processPairs(*pairArrP, &t.dupIdx, &t.opticalDupIdx, &t.repIdx, &t.singletonIdx, &t.notDupIdx,
|
|
|
|
&t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx);
|
|
|
|
&t.notOpticalDupIdx, &t.notRepIdx, &t.notSingletonIdx);
|
|
|
|
if (taskSeq < lp.taskSeq)
|
|
|
|
if (taskSeq < lp.taskSeq)
|
|
|
|
@ -1363,30 +1362,36 @@ static void doIntersect(PipelineArg &pipeArg) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
// 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据
|
|
|
|
// 最后再添加,以防开始赋值,后来这个位置要是又添加了新的数据
|
|
|
|
// 放在这里,因为lp中的unpairedPosArr中的readends可能会被修改(比如optical duplicate)
|
|
|
|
// 放在这里,因为lp中的unpairedPosArr中的readends可能会被修改(比如optical duplicate)
|
|
|
|
for (auto posKey : addToGlobal) {
|
|
|
|
// for (auto posKey : addToGlobal) {
|
|
|
|
g.unpairedPosArr[posKey] = lppSmd->unpairedPosArr[posKey];
|
|
|
|
// g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey];
|
|
|
|
}
|
|
|
|
// }
|
|
|
|
|
|
|
|
// return;
|
|
|
|
|
|
|
|
|
|
|
|
// 更新结果
|
|
|
|
// 更新结果
|
|
|
|
// for (auto &e : taskChanged) {
|
|
|
|
for (auto &e : taskChanged) {
|
|
|
|
// auto taskSeq = e.first;
|
|
|
|
auto taskSeq = e.first;
|
|
|
|
// auto &t = e.second;
|
|
|
|
auto &t = e.second;
|
|
|
|
// if (taskSeq < lp.taskSeq) {
|
|
|
|
if (taskSeq < lp.taskSeq) {
|
|
|
|
// refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx,
|
|
|
|
refeshTaskDupInfo(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx,
|
|
|
|
// t.notRepIdx, t.notSingletonIdx, g.latterDupIdxArr[taskSeq],
|
|
|
|
t.notRepIdx, t.notSingletonIdx, g.latterDupIdxArr[taskSeq],
|
|
|
|
// g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq],
|
|
|
|
g.latterOpticalDupIdxArr[taskSeq], g.latterRepIdxArr[taskSeq],
|
|
|
|
// g.latterSingletonIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq],
|
|
|
|
g.latterSingletonIdxArr[taskSeq], g.latterNotDupIdxArr[taskSeq],
|
|
|
|
// g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq],
|
|
|
|
g.latterNotOpticalDupIdxArr[taskSeq], g.latterNotRepIdxArr[taskSeq],
|
|
|
|
// g.latterNotSingletonIdxArr[taskSeq]);
|
|
|
|
g.latterNotSingletonIdxArr[taskSeq]);
|
|
|
|
// } else if (taskSeq == lp.taskSeq) {
|
|
|
|
} else if (taskSeq == lp.taskSeq) {
|
|
|
|
// refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx,
|
|
|
|
refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx,
|
|
|
|
// t.notRepIdx, t.notSingletonIdx, lp, p);
|
|
|
|
t.notRepIdx, t.notSingletonIdx, lp, p);
|
|
|
|
// } else {
|
|
|
|
} else {
|
|
|
|
// refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx,
|
|
|
|
refreshMarkDupData(t.dupIdx, t.opticalDupIdx, t.repIdx, t.singletonIdx, t.notDupIdx, t.notOpticalDupIdx,
|
|
|
|
// t.notRepIdx, t.notSingletonIdx, p, lp); // 把结果放到p中
|
|
|
|
t.notRepIdx, t.notSingletonIdx, p, lp); // 把结果放到p中
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// for (auto posKey : addToGlobal) {
|
|
|
|
|
|
|
|
// g.unpairedPosArr[posKey] = lp.unpairedPosArr[posKey];
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
// }
|
|
|
|
|
|
|
|
*/
|
|
|
|
// return;
|
|
|
|
|
|
|
|
|
|
|
|
// 将dupidx放进全局数据
|
|
|
|
// 将dupidx放进全局数据
|
|
|
|
g.latterDupIdxArr.push_back(DPSet<DupInfo>());
|
|
|
|
g.latterDupIdxArr.push_back(DPSet<DupInfo>());
|
|
|
|
g.latterOpticalDupIdxArr.push_back(MDSet<int64_t>());
|
|
|
|
g.latterOpticalDupIdxArr.push_back(MDSet<int64_t>());
|
|
|
|
@ -1417,7 +1422,7 @@ static void doIntersect(PipelineArg &pipeArg) {
|
|
|
|
auto &vSingletonIdx = g.singletonIdxArr.back();
|
|
|
|
auto &vSingletonIdx = g.singletonIdxArr.back();
|
|
|
|
vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end());
|
|
|
|
vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end());
|
|
|
|
std::sort(vSingletonIdx.begin(), vSingletonIdx.end());
|
|
|
|
std::sort(vSingletonIdx.begin(), vSingletonIdx.end());
|
|
|
|
gSingletonNum += lp.pairSingletonIdx.size();
|
|
|
|
// gSingletonNum += lp.pairSingletonIdx.size();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static void *pipeRead(void *data) {
|
|
|
|
static void *pipeRead(void *data) {
|
|
|
|
@ -1426,8 +1431,10 @@ static void *pipeRead(void *data) {
|
|
|
|
inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem);
|
|
|
|
inBamBuf.Init(g_inBamFp, g_inBamHeader, g_gArg.max_mem);
|
|
|
|
int64_t readNumSum = 0;
|
|
|
|
int64_t readNumSum = 0;
|
|
|
|
while (1) {
|
|
|
|
while (1) {
|
|
|
|
|
|
|
|
tm_arr[10].acc_start();
|
|
|
|
POSSESS(pipeArg.readSig);
|
|
|
|
POSSESS(pipeArg.readSig);
|
|
|
|
WAIT_FOR(pipeArg.readSig, NOT_TO_BE, 1); // 只有一个坑,因为在bambuf内部支持异步读取
|
|
|
|
WAIT_FOR(pipeArg.readSig, NOT_TO_BE, 1); // 只有一个坑,因为在bambuf内部支持异步读取
|
|
|
|
|
|
|
|
tm_arr[10].acc_end();
|
|
|
|
size_t readNum = 0;
|
|
|
|
size_t readNum = 0;
|
|
|
|
tm_arr[0].acc_start();
|
|
|
|
tm_arr[0].acc_start();
|
|
|
|
if (inBamBuf.ReadStat() >= 0) readNum = inBamBuf.ReadBam(); // 读入新一轮的数据
|
|
|
|
if (inBamBuf.ReadStat() >= 0) readNum = inBamBuf.ReadBam(); // 读入新一轮的数据
|
|
|
|
@ -1456,17 +1463,23 @@ static void *pipeGenRE(void *data) {
|
|
|
|
PipelineArg &pipeArg = *(PipelineArg *)data;
|
|
|
|
PipelineArg &pipeArg = *(PipelineArg *)data;
|
|
|
|
auto &genREData = pipeArg.genREData;
|
|
|
|
auto &genREData = pipeArg.genREData;
|
|
|
|
// init generate read ends data by num_thread
|
|
|
|
// init generate read ends data by num_thread
|
|
|
|
|
|
|
|
int genREThread = pipeArg.genREThreadNum;
|
|
|
|
|
|
|
|
// / 4;
|
|
|
|
for (int i = 0; i < pipeArg.GENBUFNUM; ++i) {
|
|
|
|
for (int i = 0; i < pipeArg.GENBUFNUM; ++i) {
|
|
|
|
genREData[i].Init(g_gArg.num_threads);
|
|
|
|
genREData[i].Init(genREThread);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
while(1) {
|
|
|
|
while(1) {
|
|
|
|
|
|
|
|
tm_arr[11].acc_start();
|
|
|
|
POSSESS(pipeArg.readSig);
|
|
|
|
POSSESS(pipeArg.readSig);
|
|
|
|
WAIT_FOR(pipeArg.readSig, NOT_TO_BE, 0); // 等待有数据
|
|
|
|
WAIT_FOR(pipeArg.readSig, NOT_TO_BE, 0); // 等待有数据
|
|
|
|
POSSESS(pipeArg.genRESig);
|
|
|
|
POSSESS(pipeArg.genRESig);
|
|
|
|
|
|
|
|
tm_arr[11].acc_end();
|
|
|
|
// cout << "gre: " << PEEK_LOCK(pipeArg.genRESig) << endl;
|
|
|
|
// cout << "gre: " << PEEK_LOCK(pipeArg.genRESig) << endl;
|
|
|
|
|
|
|
|
tm_arr[21].acc_start();
|
|
|
|
WAIT_FOR(pipeArg.genRESig, NOT_TO_BE, pipeArg.GENBUFNUM); // 有BUFNUM个坑
|
|
|
|
WAIT_FOR(pipeArg.genRESig, NOT_TO_BE, pipeArg.GENBUFNUM); // 有BUFNUM个坑
|
|
|
|
RELEASE(pipeArg.genRESig); // 因为有不止一个位置,所以要释放
|
|
|
|
RELEASE(pipeArg.genRESig); // 因为有不止一个位置,所以要释放
|
|
|
|
|
|
|
|
tm_arr[21].acc_end();
|
|
|
|
|
|
|
|
|
|
|
|
if (pipeArg.readFinish) {
|
|
|
|
if (pipeArg.readFinish) {
|
|
|
|
POSSESS(pipeArg.genRESig);
|
|
|
|
POSSESS(pipeArg.genRESig);
|
|
|
|
@ -1492,13 +1505,17 @@ static void *pipeSort(void *data) {
|
|
|
|
PipelineArg &pipeArg = *(PipelineArg *)data;
|
|
|
|
PipelineArg &pipeArg = *(PipelineArg *)data;
|
|
|
|
|
|
|
|
|
|
|
|
while (1) {
|
|
|
|
while (1) {
|
|
|
|
|
|
|
|
tm_arr[12].acc_start();
|
|
|
|
POSSESS(pipeArg.genRESig);
|
|
|
|
POSSESS(pipeArg.genRESig);
|
|
|
|
WAIT_FOR(pipeArg.genRESig, NOT_TO_BE, 0); // 等待有数据
|
|
|
|
WAIT_FOR(pipeArg.genRESig, NOT_TO_BE, 0); // 等待有数据
|
|
|
|
RELEASE(pipeArg.genRESig);
|
|
|
|
RELEASE(pipeArg.genRESig);
|
|
|
|
|
|
|
|
tm_arr[12].acc_end();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tm_arr[22].acc_start();
|
|
|
|
POSSESS(pipeArg.sortSig);
|
|
|
|
POSSESS(pipeArg.sortSig);
|
|
|
|
WAIT_FOR(pipeArg.sortSig, NOT_TO_BE, pipeArg.SORTBUFNUM); // 有BUFNUM个位置
|
|
|
|
WAIT_FOR(pipeArg.sortSig, NOT_TO_BE, pipeArg.SORTBUFNUM); // 有BUFNUM个位置
|
|
|
|
RELEASE(pipeArg.sortSig);
|
|
|
|
RELEASE(pipeArg.sortSig);
|
|
|
|
|
|
|
|
tm_arr[22].acc_end();
|
|
|
|
|
|
|
|
|
|
|
|
if (pipeArg.genREFinish) {
|
|
|
|
if (pipeArg.genREFinish) {
|
|
|
|
// 处理完剩余数据
|
|
|
|
// 处理完剩余数据
|
|
|
|
@ -1532,13 +1549,17 @@ static void *pipeMarkDup(void *data) {
|
|
|
|
PipelineArg &pipeArg = *(PipelineArg *)data;
|
|
|
|
PipelineArg &pipeArg = *(PipelineArg *)data;
|
|
|
|
|
|
|
|
|
|
|
|
while (1) {
|
|
|
|
while (1) {
|
|
|
|
|
|
|
|
tm_arr[13].acc_start();
|
|
|
|
POSSESS(pipeArg.sortSig);
|
|
|
|
POSSESS(pipeArg.sortSig);
|
|
|
|
WAIT_FOR(pipeArg.sortSig, NOT_TO_BE, 0); // 等待有数据
|
|
|
|
WAIT_FOR(pipeArg.sortSig, NOT_TO_BE, 0); // 等待有数据
|
|
|
|
RELEASE(pipeArg.sortSig);
|
|
|
|
RELEASE(pipeArg.sortSig);
|
|
|
|
|
|
|
|
tm_arr[13].acc_end();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
tm_arr[23].acc_start();
|
|
|
|
POSSESS(pipeArg.markDupSig);
|
|
|
|
POSSESS(pipeArg.markDupSig);
|
|
|
|
WAIT_FOR(pipeArg.markDupSig, NOT_TO_BE, pipeArg.MARKBUFNUM);
|
|
|
|
WAIT_FOR(pipeArg.markDupSig, NOT_TO_BE, pipeArg.MARKBUFNUM);
|
|
|
|
RELEASE(pipeArg.markDupSig);
|
|
|
|
RELEASE(pipeArg.markDupSig);
|
|
|
|
|
|
|
|
tm_arr[23].acc_end();
|
|
|
|
|
|
|
|
|
|
|
|
if (pipeArg.sortFinish) {
|
|
|
|
if (pipeArg.sortFinish) {
|
|
|
|
// 应该还得处理剩余的数据
|
|
|
|
// 应该还得处理剩余的数据
|
|
|
|
@ -1554,7 +1575,7 @@ static void *pipeMarkDup(void *data) {
|
|
|
|
break;
|
|
|
|
break;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
/* 冗余检测 readends */
|
|
|
|
/* 冗余检测 readends */
|
|
|
|
// cout << "markdup order: " << pipeArg.markDupOrder << endl;
|
|
|
|
cout << "markdup order: " << pipeArg.markDupOrder << endl;
|
|
|
|
tm_arr[3].acc_start();
|
|
|
|
tm_arr[3].acc_start();
|
|
|
|
doMarkDup(pipeArg);
|
|
|
|
doMarkDup(pipeArg);
|
|
|
|
tm_arr[3].acc_end();
|
|
|
|
tm_arr[3].acc_end();
|
|
|
|
@ -1569,11 +1590,13 @@ static void *pipeMarkDup(void *data) {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
static void *pipeIntersect(void *data) {
|
|
|
|
static void *pipeIntersect(void *data) {
|
|
|
|
PipelineArg &pipeArg = *(PipelineArg *)data;
|
|
|
|
PipelineArg &pipeArg = *(PipelineArg *)data;
|
|
|
|
|
|
|
|
pipeArg.intersectOrder = 1;
|
|
|
|
while (1) {
|
|
|
|
while (1) {
|
|
|
|
|
|
|
|
tm_arr[14].acc_start();
|
|
|
|
POSSESS(pipeArg.markDupSig);
|
|
|
|
POSSESS(pipeArg.markDupSig);
|
|
|
|
WAIT_FOR(pipeArg.markDupSig, NOT_TO_BE, 0); // 等待有数据
|
|
|
|
WAIT_FOR(pipeArg.markDupSig, TO_BE_MORE_THAN, 1); // 等待有数据
|
|
|
|
RELEASE(pipeArg.markDupSig);
|
|
|
|
RELEASE(pipeArg.markDupSig);
|
|
|
|
|
|
|
|
tm_arr[14].acc_end();
|
|
|
|
|
|
|
|
|
|
|
|
if (pipeArg.markDupFinish) {
|
|
|
|
if (pipeArg.markDupFinish) {
|
|
|
|
while (pipeArg.intersectOrder < pipeArg.markDupOrder) {
|
|
|
|
while (pipeArg.intersectOrder < pipeArg.markDupOrder) {
|
|
|
|
@ -1587,11 +1610,13 @@ static void *pipeIntersect(void *data) {
|
|
|
|
/* 交叉数据处理 readends */
|
|
|
|
/* 交叉数据处理 readends */
|
|
|
|
// cout << "intersect order: " << pipeArg.intersectOrder << endl;
|
|
|
|
// cout << "intersect order: " << pipeArg.intersectOrder << endl;
|
|
|
|
tm_arr[4].acc_start();
|
|
|
|
tm_arr[4].acc_start();
|
|
|
|
|
|
|
|
// cout << "intersect markdup size: " << PEEK_LOCK(pipeArg.markDupSig) << endl;
|
|
|
|
doIntersect(pipeArg);
|
|
|
|
doIntersect(pipeArg);
|
|
|
|
tm_arr[4].acc_end();
|
|
|
|
tm_arr[4].acc_end();
|
|
|
|
|
|
|
|
|
|
|
|
POSSESS(pipeArg.markDupSig);
|
|
|
|
POSSESS(pipeArg.markDupSig);
|
|
|
|
TWIST(pipeArg.markDupSig, BY, -1);
|
|
|
|
TWIST(pipeArg.markDupSig, BY, -1);
|
|
|
|
|
|
|
|
|
|
|
|
pipeArg.intersectOrder += 1;
|
|
|
|
pipeArg.intersectOrder += 1;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
return 0;
|
|
|
|
return 0;
|
|
|
|
@ -1600,9 +1625,8 @@ static void *pipeIntersect(void *data) {
|
|
|
|
/* 当所有任务结束后,global data里还有未处理的数据 */
|
|
|
|
/* 当所有任务结束后,global data里还有未处理的数据 */
|
|
|
|
static void mergeAllTask(PipelineArg &pipeArg) {
|
|
|
|
static void mergeAllTask(PipelineArg &pipeArg) {
|
|
|
|
// cout << "last task start" << endl;
|
|
|
|
// cout << "last task start" << endl;
|
|
|
|
MarkDupData &mdData = pipeArg.markDupData[(pipeArg.markDupOrder - 1) % pipeArg.MARKBUFNUM];
|
|
|
|
MarkDupData &lp = pipeArg.markDupData[(pipeArg.markDupOrder - 1) % pipeArg.MARKBUFNUM];
|
|
|
|
IntersectData &g = pipeArg.intersectData;
|
|
|
|
IntersectData &g = pipeArg.intersectData;
|
|
|
|
auto &lp = *(SortMarkData *)mdData.pSmd;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
// 遗留的未匹配的pair
|
|
|
|
// 遗留的未匹配的pair
|
|
|
|
for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
|
|
|
|
for (auto &prevUnpair : lp.unpairedDic) { // 遍历上一个任务中的每个未匹配的read
|
|
|
|
@ -1673,30 +1697,33 @@ static void mergeAllTask(PipelineArg &pipeArg) {
|
|
|
|
|
|
|
|
|
|
|
|
g.dupIdxArr.push_back(vector<DupInfo>());
|
|
|
|
g.dupIdxArr.push_back(vector<DupInfo>());
|
|
|
|
auto &vIdx = g.dupIdxArr.back();
|
|
|
|
auto &vIdx = g.dupIdxArr.back();
|
|
|
|
mdData.pairDupIdx.insert(mdData.fragDupIdx.begin(), mdData.fragDupIdx.end());
|
|
|
|
lp.pairDupIdx.insert(lp.fragDupIdx.begin(), lp.fragDupIdx.end());
|
|
|
|
vIdx.insert(vIdx.end(), mdData.pairDupIdx.begin(), mdData.pairDupIdx.end());
|
|
|
|
vIdx.insert(vIdx.end(), lp.pairDupIdx.begin(), lp.pairDupIdx.end());
|
|
|
|
std::sort(vIdx.begin(), vIdx.end());
|
|
|
|
std::sort(vIdx.begin(), vIdx.end());
|
|
|
|
|
|
|
|
|
|
|
|
g.opticalDupIdxArr.push_back(vector<int64_t>());
|
|
|
|
g.opticalDupIdxArr.push_back(vector<int64_t>());
|
|
|
|
auto &vOpticalIdx = g.opticalDupIdxArr.back();
|
|
|
|
auto &vOpticalIdx = g.opticalDupIdxArr.back();
|
|
|
|
vOpticalIdx.insert(vOpticalIdx.end(), mdData.pairOpticalDupIdx.begin(), mdData.pairOpticalDupIdx.end());
|
|
|
|
vOpticalIdx.insert(vOpticalIdx.end(), lp.pairOpticalDupIdx.begin(), lp.pairOpticalDupIdx.end());
|
|
|
|
std::sort(vOpticalIdx.begin(), vOpticalIdx.end());
|
|
|
|
std::sort(vOpticalIdx.begin(), vOpticalIdx.end());
|
|
|
|
|
|
|
|
|
|
|
|
g.repIdxArr.push_back(vector<DupInfo>());
|
|
|
|
g.repIdxArr.push_back(vector<DupInfo>());
|
|
|
|
auto &vRepIdx = g.repIdxArr.back();
|
|
|
|
auto &vRepIdx = g.repIdxArr.back();
|
|
|
|
vRepIdx.insert(vRepIdx.end(), mdData.pairRepIdx.begin(), mdData.pairRepIdx.end());
|
|
|
|
vRepIdx.insert(vRepIdx.end(), lp.pairRepIdx.begin(), lp.pairRepIdx.end());
|
|
|
|
std::sort(vRepIdx.begin(), vRepIdx.end());
|
|
|
|
std::sort(vRepIdx.begin(), vRepIdx.end());
|
|
|
|
|
|
|
|
|
|
|
|
g.singletonIdxArr.push_back(vector<int64_t>());
|
|
|
|
g.singletonIdxArr.push_back(vector<int64_t>());
|
|
|
|
auto &vSingletonIdx = g.singletonIdxArr.back();
|
|
|
|
auto &vSingletonIdx = g.singletonIdxArr.back();
|
|
|
|
vSingletonIdx.insert(vSingletonIdx.end(), mdData.pairSingletonIdx.begin(), mdData.pairSingletonIdx.end());
|
|
|
|
vSingletonIdx.insert(vSingletonIdx.end(), lp.pairSingletonIdx.begin(), lp.pairSingletonIdx.end());
|
|
|
|
std::sort(vSingletonIdx.begin(), vSingletonIdx.end());
|
|
|
|
std::sort(vSingletonIdx.begin(), vSingletonIdx.end());
|
|
|
|
gSingletonNum += mdData.pairSingletonIdx.size();
|
|
|
|
// gSingletonNum += lp.pairSingletonIdx.size();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
static void startPipeline() {
|
|
|
|
static void startPipeline() {
|
|
|
|
Timer::log_time("pipeline start");
|
|
|
|
Timer::log_time("pipeline start");
|
|
|
|
PipelineArg pipeArg;
|
|
|
|
PipelineArg pipeArg;
|
|
|
|
|
|
|
|
pipeArg.genREThreadNum = g_gArg.num_threads;
|
|
|
|
|
|
|
|
// / 2;
|
|
|
|
|
|
|
|
// g_gArg.num_threads / 2;
|
|
|
|
|
|
|
|
|
|
|
|
pthread_t tidArr[5];
|
|
|
|
pthread_t tidArr[5];
|
|
|
|
pthread_create(&tidArr[0], 0, pipeRead, &pipeArg);
|
|
|
|
pthread_create(&tidArr[0], 0, pipeRead, &pipeArg);
|
|
|
|
@ -1716,21 +1743,48 @@ static void startPipeline() {
|
|
|
|
for (auto &arr : pipeArg.intersectData.opticalDupIdxArr) opticalDupNum += arr.size();
|
|
|
|
for (auto &arr : pipeArg.intersectData.opticalDupIdxArr) opticalDupNum += arr.size();
|
|
|
|
for (auto &arr : pipeArg.intersectData.singletonIdxArr) singletonNum += arr.size();
|
|
|
|
for (auto &arr : pipeArg.intersectData.singletonIdxArr) singletonNum += arr.size();
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
map<int64_t, int> dup;
|
|
|
|
|
|
|
|
#if 0
|
|
|
|
|
|
|
|
int taskSeq = 0;
|
|
|
|
|
|
|
|
for (auto &arr : pipeArg.intersectData.dupIdxArr) {
|
|
|
|
|
|
|
|
for (auto idx : arr) {
|
|
|
|
|
|
|
|
if (dup.find(idx.idx) != dup.end()) {
|
|
|
|
|
|
|
|
//if (taskSeq - 1 > dup[idx])
|
|
|
|
|
|
|
|
cout << "dup index: " << dup[idx] << '\t' << taskSeq << '\t' << idx << endl;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
dup[idx.idx] = taskSeq;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
// cout << taskSeq << "\t" << arr.size() << endl;
|
|
|
|
|
|
|
|
taskSeq++;
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
#endif
|
|
|
|
|
|
|
|
|
|
|
|
cout << "Final read order: " << pipeArg.readOrder << endl;
|
|
|
|
cout << "Final read order: " << pipeArg.readOrder << endl;
|
|
|
|
cout << "Final gen order: " << pipeArg.genREOrder << endl;
|
|
|
|
cout << "Final gen order: " << pipeArg.genREOrder << endl;
|
|
|
|
cout << "Final sort order: " << pipeArg.sortOrder << endl;
|
|
|
|
cout << "Final sort order: " << pipeArg.sortOrder << endl;
|
|
|
|
cout << "Final mark order: " << pipeArg.markDupOrder << endl;
|
|
|
|
cout << "Final mark order: " << pipeArg.markDupOrder << endl;
|
|
|
|
cout << "Final inter order: " << pipeArg.intersectOrder << endl;
|
|
|
|
cout << "Final inter order: " << pipeArg.intersectOrder << endl;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cout << "w read time : " << tm_arr[10].acc_seconds_elapsed() << endl;
|
|
|
|
|
|
|
|
cout << "w gen time : " << tm_arr[11].acc_seconds_elapsed() << endl;
|
|
|
|
|
|
|
|
cout << "w sort time : " << tm_arr[12].acc_seconds_elapsed() << endl;
|
|
|
|
|
|
|
|
cout << "w markdup time : " << tm_arr[13].acc_seconds_elapsed() << endl;
|
|
|
|
|
|
|
|
cout << "w intersect time: " << tm_arr[14].acc_seconds_elapsed() << endl;
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cout << "w1 gen time : " << tm_arr[21].acc_seconds_elapsed() << endl;
|
|
|
|
|
|
|
|
cout << "w1 sort time : " << tm_arr[22].acc_seconds_elapsed() << endl;
|
|
|
|
|
|
|
|
cout << "w1 markdup time : " << tm_arr[23].acc_seconds_elapsed() << endl;
|
|
|
|
|
|
|
|
|
|
|
|
cout << "read time : " << tm_arr[0].acc_seconds_elapsed() << endl;
|
|
|
|
cout << "read time : " << tm_arr[0].acc_seconds_elapsed() << endl;
|
|
|
|
cout << "gen time : " << tm_arr[1].acc_seconds_elapsed() << endl;
|
|
|
|
cout << "gen time : " << tm_arr[1].acc_seconds_elapsed() << endl;
|
|
|
|
cout << "sort time : " << tm_arr[2].acc_seconds_elapsed() << endl;
|
|
|
|
cout << "sort time : " << tm_arr[2].acc_seconds_elapsed() << endl;
|
|
|
|
cout << "markdup time : " << tm_arr[3].acc_seconds_elapsed() << endl;
|
|
|
|
cout << "markdup time : " << tm_arr[3].acc_seconds_elapsed() << endl;
|
|
|
|
cout << "intersect time: " << tm_arr[4].acc_seconds_elapsed() << endl;
|
|
|
|
cout << "intersect time: " << tm_arr[4].acc_seconds_elapsed() << endl;
|
|
|
|
|
|
|
|
|
|
|
|
cout << "copy time: " << tm_arr[5].acc_seconds_elapsed() << endl;
|
|
|
|
cout << "copy time: " << tm_arr[5].acc_seconds_elapsed() << endl;
|
|
|
|
cout << "merge al6 time: " << tm_arr[6].acc_seconds_elapsed() << endl;
|
|
|
|
cout << "merge al6 time: " << tm_arr[6].acc_seconds_elapsed() << endl;
|
|
|
|
|
|
|
|
|
|
|
|
cout << "dup num : " << dupNum << endl;
|
|
|
|
cout << "dup num : " << dupNum << "\t" << dup.size() << endl;
|
|
|
|
cout << "optical dup num : " << opticalDupNum / 2 << "\t" << opticalDupNum << endl;
|
|
|
|
cout << "optical dup num : " << opticalDupNum / 2 << "\t" << opticalDupNum << endl;
|
|
|
|
cout << "singleton num : " << singletonNum << endl;
|
|
|
|
cout << "singleton num : " << singletonNum << endl;
|
|
|
|
|
|
|
|
|
|
|
|
@ -1779,20 +1833,20 @@ void pipelineMarkDups() {
|
|
|
|
|
|
|
|
|
|
|
|
tm_arr[0].acc_start();
|
|
|
|
tm_arr[0].acc_start();
|
|
|
|
Timer t1;
|
|
|
|
Timer t1;
|
|
|
|
generateReadEnds(pdArg, curArgP);
|
|
|
|
// generateReadEnds(pdArg, curArgP);
|
|
|
|
// cout << "calc read end time: " << t1.seconds_elapsed() << " s" << endl;
|
|
|
|
// cout << "calc read end time: " << t1.seconds_elapsed() << " s" << endl;
|
|
|
|
tm_arr[0].acc_end();
|
|
|
|
tm_arr[0].acc_end();
|
|
|
|
|
|
|
|
|
|
|
|
tm_arr[1].acc_start();
|
|
|
|
tm_arr[1].acc_start();
|
|
|
|
t1.reinit();
|
|
|
|
t1.reinit();
|
|
|
|
markdups(curArgP);
|
|
|
|
// markdups(curArgP);
|
|
|
|
// cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl;
|
|
|
|
// cout << "markdups time: " << t1.seconds_elapsed() << " s" << endl;
|
|
|
|
tm_arr[1].acc_end();
|
|
|
|
tm_arr[1].acc_end();
|
|
|
|
|
|
|
|
|
|
|
|
if (!isFirstRound) {
|
|
|
|
if (!isFirstRound) {
|
|
|
|
tm_arr[2].acc_start();
|
|
|
|
tm_arr[2].acc_start();
|
|
|
|
t1.reinit();
|
|
|
|
t1.reinit();
|
|
|
|
handleIntersectData(lastArgP, curArgP, &gData);
|
|
|
|
// handleIntersectData(lastArgP, curArgP, &gData);
|
|
|
|
// cout << "intersect time: " << t1.seconds_elapsed() << " s" << endl;
|
|
|
|
// cout << "intersect time: " << t1.seconds_elapsed() << " s" << endl;
|
|
|
|
// addTaskIdxToSet(lastArgP, &gData);
|
|
|
|
// addTaskIdxToSet(lastArgP, &gData);
|
|
|
|
tm_arr[2].acc_end();
|
|
|
|
tm_arr[2].acc_end();
|
|
|
|
@ -1815,7 +1869,7 @@ void pipelineMarkDups() {
|
|
|
|
}
|
|
|
|
}
|
|
|
|
tm_arr[3].acc_start();
|
|
|
|
tm_arr[3].acc_start();
|
|
|
|
// 处理剩下的全局数据
|
|
|
|
// 处理剩下的全局数据
|
|
|
|
handleLastTask(lastArgP, &gData);
|
|
|
|
// handleLastTask(lastArgP, &gData);
|
|
|
|
|
|
|
|
|
|
|
|
// cout << "here 2" << endl;
|
|
|
|
// cout << "here 2" << endl;
|
|
|
|
tm_arr[3].acc_end();
|
|
|
|
tm_arr[3].acc_end();
|
|
|
|
|