实现了并行,小数据上结果一致,大数据还没测试
This commit is contained in:
parent
022e70e1f3
commit
4f9fecf078
|
|
@ -13,14 +13,16 @@
|
||||||
#include <header.h> // in htslib
|
#include <header.h> // in htslib
|
||||||
#include <htslib/sam.h>
|
#include <htslib/sam.h>
|
||||||
#include <htslib/thread_pool.h>
|
#include <htslib/thread_pool.h>
|
||||||
|
#include <klib/kthread.h>
|
||||||
|
|
||||||
#include "aux_arg.h"
|
#include "aux_arg.h"
|
||||||
#include "common_data.h"
|
#include "common_data.h"
|
||||||
|
#include "read_utils.h"
|
||||||
|
#include "recal_funcs.h"
|
||||||
|
#include "recal_utils.h"
|
||||||
#include "util/debug.h"
|
#include "util/debug.h"
|
||||||
#include "util/profiling.h"
|
#include "util/profiling.h"
|
||||||
#include "recal_utils.h"
|
#include "util/yarn.h"
|
||||||
#include "recal_funcs.h"
|
|
||||||
#include "read_utils.h"
|
|
||||||
|
|
||||||
using std::vector;
|
using std::vector;
|
||||||
|
|
||||||
|
|
@ -28,11 +30,41 @@ namespace nsgv {
|
||||||
// 全局变量 for apply bqsr
|
// 全局变量 for apply bqsr
|
||||||
samFile* gOutBamFp; // 输出文件, sam或者bam格式
|
samFile* gOutBamFp; // 输出文件, sam或者bam格式
|
||||||
sam_hdr_t* gOutBamHeader; // 输出文件的header
|
sam_hdr_t* gOutBamHeader; // 输出文件的header
|
||||||
RecalTables gRecalTables; // 保留一个全局的recalibrate tables就行了
|
// RecalTables gRecalTables; // 保留一个全局的recalibrate tables就行了
|
||||||
vector<uint8_t> gQuantizedQuals; // 读取quantized info table信息得到的,第三列数据
|
vector<uint8_t> gQuantizedQuals; // 读取quantized info table信息得到的,第三列数据
|
||||||
StableArray<bam1_t*> gRecalBams[2]; // 保存已经校正过质量分数的bam数据,双buffer
|
StableArray<bam1_t*> gRecalBams[2]; // 保存已经校正过质量分数的bam数据,双buffer
|
||||||
}; // namespace nsgv
|
}; // namespace nsgv
|
||||||
|
|
||||||
|
// 读入数据
|
||||||
|
typedef vector<BamWrap*> ReadData;
|
||||||
|
|
||||||
|
typedef StableArray<bam1_t*> ApplyData;
|
||||||
|
|
||||||
|
// 并行流水线参数for apply bqsr
|
||||||
|
struct ApplyPipeArg {
|
||||||
|
static constexpr int RECAL_BAM_BUF_NUM = 2;
|
||||||
|
uint64_t readOrder = 0;
|
||||||
|
uint64_t applyOrder = 0;
|
||||||
|
uint64_t writeOrder = 0;
|
||||||
|
int numThread = 0;
|
||||||
|
|
||||||
|
volatile int readFinish = 0;
|
||||||
|
volatile int applyFinish = 0;
|
||||||
|
|
||||||
|
yarn::lock_t* readSig;
|
||||||
|
yarn::lock_t* applySig;
|
||||||
|
|
||||||
|
ReadData readData; // 因为bam_buf本身实现了异步读取,所以这里一个readdata就够了
|
||||||
|
ApplyData applyData[RECAL_BAM_BUF_NUM]; // bqsr qpply过程需要的数据
|
||||||
|
|
||||||
|
vector<AuxVar>& auxArr;
|
||||||
|
|
||||||
|
ApplyPipeArg(vector<AuxVar>& _auxArr) : auxArr(_auxArr) {
|
||||||
|
readSig = yarn::NEW_LOCK(0);
|
||||||
|
applySig = yarn::NEW_LOCK(0);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
// 串行apply bqsr
|
// 串行apply bqsr
|
||||||
int SerialApplyBQSR(AuxVar &aux) {
|
int SerialApplyBQSR(AuxVar &aux) {
|
||||||
BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO);
|
BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO);
|
||||||
|
|
@ -42,7 +74,7 @@ int SerialApplyBQSR(AuxVar &aux) {
|
||||||
|
|
||||||
PerReadCovariateMatrix& readCovariates = aux.readCovariates;
|
PerReadCovariateMatrix& readCovariates = aux.readCovariates;
|
||||||
// 一. 读取并解析recalibrate tables
|
// 一. 读取并解析recalibrate tables
|
||||||
auto& recalTables = *aux.bqsrTable;
|
auto& recalTables = aux.recalTables;
|
||||||
|
|
||||||
// 全局的校正后的bam数组
|
// 全局的校正后的bam数组
|
||||||
auto& recalBams = nsgv::gRecalBams[0];
|
auto& recalBams = nsgv::gRecalBams[0];
|
||||||
|
|
@ -63,12 +95,14 @@ int SerialApplyBQSR(AuxVar &aux) {
|
||||||
auto bams = inBamBuf.GetBamArr();
|
auto bams = inBamBuf.GetBamArr();
|
||||||
spdlog::info("{} reads processed in {} round", readNum, round);
|
spdlog::info("{} reads processed in {} round", readNum, round);
|
||||||
|
|
||||||
if (recalBams.size() < bams.size()) {
|
if (recalBams.capacity() < bams.size()) {
|
||||||
int start = recalBams.size();
|
int start = recalBams.capacity();
|
||||||
recalBams.resize(bams.size());
|
recalBams.resize(bams.size());
|
||||||
for (int i = start; i < recalBams.size(); ++i) {
|
for (int i = start; i < recalBams.size(); ++i) {
|
||||||
recalBams[i] = bam_init1();
|
recalBams[i] = bam_init1();
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
recalBams.resize(bams.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
// 二. 遍历每个bam(read)记录,进行处理
|
// 二. 遍历每个bam(read)记录,进行处理
|
||||||
|
|
@ -153,11 +187,11 @@ int SerialApplyBQSR(AuxVar &aux) {
|
||||||
recalibratedQuals[offset] = quantizedQualityScore;
|
recalibratedQuals[offset] = quantizedQualityScore;
|
||||||
}
|
}
|
||||||
|
|
||||||
// fprintf(gf[4], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
|
fprintf(gf[4], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
|
||||||
// for (size_t si = 0; si < sd.read_len; ++si) {
|
for (size_t si = 0; si < sd.read_len; ++si) {
|
||||||
// fprintf(gf[4], "%d ", (int)recalibratedQuals[si]);
|
fprintf(gf[4], "%d ", (int)recalibratedQuals[si]);
|
||||||
// }
|
}
|
||||||
// fprintf(gf[4], "\n");
|
fprintf(gf[4], "\n");
|
||||||
|
|
||||||
if (sam_write1(nsgv::gOutBamFp, nsgv::gOutBamHeader, bw->b) < 0) {
|
if (sam_write1(nsgv::gOutBamFp, nsgv::gOutBamHeader, bw->b) < 0) {
|
||||||
spdlog::error("failed writing sam record to \"{}\"", nsgv::gBqsrArg.OUTPUT_FILE.c_str());
|
spdlog::error("failed writing sam record to \"{}\"", nsgv::gBqsrArg.OUTPUT_FILE.c_str());
|
||||||
|
|
@ -179,8 +213,228 @@ int SerialApplyBQSR(AuxVar &aux) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 读取数据线程
|
||||||
|
static void *pipeRead(void *data) {
|
||||||
|
ApplyPipeArg& p = *(ApplyPipeArg*)data;
|
||||||
|
BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO);
|
||||||
|
inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM, RecalFuncs::applyBqsrReadFilterOut);
|
||||||
|
int64_t readNumSum = 0;
|
||||||
|
while(true) {
|
||||||
|
yarn::POSSESS(p.readSig);
|
||||||
|
yarn::WAIT_FOR(p.readSig, yarn::NOT_TO_BE, 1);
|
||||||
|
size_t readNum = 0;
|
||||||
|
if (inBamBuf.ReadStat() >= 0)
|
||||||
|
readNum = inBamBuf.ReadBam();
|
||||||
|
if (readNum < 1) {
|
||||||
|
p.readFinish = 1;
|
||||||
|
yarn::TWIST(p.readSig, yarn::BY, 1);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
spdlog::info("{} reads processed in {} round", readNum, p.readOrder);
|
||||||
|
p.readData = inBamBuf.GetBamArr();
|
||||||
|
|
||||||
|
readNumSum += readNum;
|
||||||
|
inBamBuf.ClearAll();
|
||||||
|
p.readOrder += 1;
|
||||||
|
yarn::TWIST(p.readSig, yarn::BY, 1);
|
||||||
|
}
|
||||||
|
spdlog::info("{} total reads processed", readNumSum);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// 并行处理数据
|
||||||
|
static void thread_worker(void* data, long i, int thid) {
|
||||||
|
ApplyPipeArg& p = *(ApplyPipeArg*)data;
|
||||||
|
auto& aux = p.auxArr[thid];
|
||||||
|
PerReadCovariateMatrix& readCovariates =aux.readCovariates;
|
||||||
|
auto& recalTables = aux.recalTables;
|
||||||
|
// 全局的校正后的bam数组
|
||||||
|
auto& recalBams = p.applyData[p.applyOrder % p.RECAL_BAM_BUF_NUM];
|
||||||
|
auto& sd = aux.sd;
|
||||||
|
auto& bams = p.readData;
|
||||||
|
BamWrap* bw = bams[i];
|
||||||
|
|
||||||
|
if (bam_copy1(recalBams[i], bams[i]->b) == NULL) {
|
||||||
|
spdlog::error("Copy bam error");
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
bw->b = recalBams[i]; // 注意这里的赋值,然后就可以对b进行更改了
|
||||||
|
sd.init();
|
||||||
|
sd.parseForApplyBQSR(bw);
|
||||||
|
sd.rid = i + aux.processedReads;
|
||||||
|
|
||||||
|
// 1. 是否使用original quality来代替当前的quality
|
||||||
|
if (nsgv::gBqsrArg.useOriginalBaseQualities)
|
||||||
|
ReadUtils::resetOriginalBaseQualities(bw->b);
|
||||||
|
|
||||||
|
// 2. 是否将当前的quality保存在tag OQ中
|
||||||
|
if (nsgv::gBqsrArg.emitOriginalQuals)
|
||||||
|
ReadUtils::setOriginalBaseQualsIfNoOQ(bw->b);
|
||||||
|
|
||||||
|
// 3. 计算read的协变量covs
|
||||||
|
PROF_START(TP_covariate);
|
||||||
|
CovariateUtils::ComputeCovariates(sd, aux.header, readCovariates, nsgv::gBqsrArg.computeIndelBQSRTables, 0);
|
||||||
|
PROF_TP_END(TP_covariate);
|
||||||
|
|
||||||
|
// clear indel qualities
|
||||||
|
ReadUtils::removeAttribute(bw->b, "BI");
|
||||||
|
ReadUtils::removeAttribute(bw->b, "BD");
|
||||||
|
|
||||||
|
// 4. 检查read的readGroup tag是否包含在bqsr table里
|
||||||
|
const char* readGroupId = ReadUtils::getReadGroupId(bw->b);
|
||||||
|
auto& covaritesForRead = readCovariates[EventType::BASE_SUBSTITUTION.index];
|
||||||
|
uint8_t* recalibratedQuals = bam_get_qual(bw->b);
|
||||||
|
auto& preUpdateQuals = sd.base_quals;
|
||||||
|
int rgKey = -1;
|
||||||
|
if (ReadGroupCovariate::RgToId.find(std::string(readGroupId)) != ReadGroupCovariate::RgToId.end())
|
||||||
|
rgKey = ReadGroupCovariate::RgToId[std::string(readGroupId)];
|
||||||
|
if (rgKey == -1) {
|
||||||
|
if (nsgv::gBqsrArg.allowMissingReadGroups) {
|
||||||
|
// Given the way the recalibration code is implemented below, we cannot recalibrate a read with a
|
||||||
|
// read group that's not in the recal table.
|
||||||
|
for (int i = 0; i < sd.read_len; i++) {
|
||||||
|
// recalibratedQuals[i] = staticQuantizedMapping != null ? staticQuantizedMapping[preUpdateQuals[i]] :
|
||||||
|
// quantizedQuals.get(preUpdateQuals[i]);
|
||||||
|
recalibratedQuals[i] = nsgv::gQuantizedQuals[preUpdateQuals[i]];
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
spdlog::error(
|
||||||
|
"Read group {} not found in the recalibration table. Use \"--allow-missing-read-group\" command line argument to ignore this "
|
||||||
|
"error.",
|
||||||
|
readGroupId);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// 5. 根据recal tables数据对read的每个碱基分别计算新的质量分数
|
||||||
|
auto& readGroupDatum = recalTables.readGroupTable(EventType::BASE_SUBSTITUTION.index, rgKey);
|
||||||
|
// Note: this loop is under very heavy use in applyBQSR. Keep it slim.
|
||||||
|
for (int offset = 0; offset < sd.read_len; offset++) { // recalibrate all bases in the read
|
||||||
|
// only recalibrate usable qualities (default: >= 6) (the original quality will come from the instrument -- reported quality)
|
||||||
|
if (recalibratedQuals[offset] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
auto& covs = readCovariates[EventType::BASE_SUBSTITUTION.index][offset];
|
||||||
|
// 根据read的协变量数据获取对应的bqsr数据
|
||||||
|
auto& qualityScoreDatum = recalTables.qualityScoreTable(EventType::BASE_SUBSTITUTION.index, rgKey, covs.baseQuality);
|
||||||
|
auto& contextDatum = recalTables.contextTable(EventType::BASE_SUBSTITUTION.index, rgKey, covs.baseQuality, covs.context);
|
||||||
|
auto& cycleDatum = recalTables.cycleTable(EventType::BASE_SUBSTITUTION.index, rgKey, covs.baseQuality, covs.cycle);
|
||||||
|
// 计算校正后的质量分数
|
||||||
|
double priorQualityScore = nsgv::gBqsrArg.globalQScorePrior > 0.0 ? nsgv::gBqsrArg.globalQScorePrior : readGroupDatum.getReportedQuality();
|
||||||
|
double rawRecalibratedQualityScore =
|
||||||
|
RecalFuncs::hierarchicalBayesianQualityEstimate(priorQualityScore, readGroupDatum, qualityScoreDatum, contextDatum, cycleDatum);
|
||||||
|
uint8_t qualIdx = RecalFuncs::getBoundedIntegerQual(rawRecalibratedQualityScore);
|
||||||
|
uint8_t quantizedQualityScore = qualIdx; // nsgv::gQuantizedQuals.at(qualIdx);
|
||||||
|
// TODO: as written the code quantizes *twice* if the static binning is enabled (first time to the dynamic bin). It should be
|
||||||
|
// quantized once.
|
||||||
|
// recalibratedQuals[offset] = staticQuantizedMapping == null ? quantizedQualityScore : staticQuantizedMapping[quantizedQualityScore];
|
||||||
|
recalibratedQuals[offset] = quantizedQualityScore;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doApply(ApplyPipeArg& p) {
|
||||||
|
auto& recalBams = p.applyData[p.applyOrder % p.RECAL_BAM_BUF_NUM];
|
||||||
|
auto& bams = p.readData;
|
||||||
|
if (recalBams.capacity() < bams.size()) {
|
||||||
|
int start = recalBams.capacity(); // 因为这之前的bam1_t都有内存空间了
|
||||||
|
recalBams.resize(bams.size());
|
||||||
|
for (int i = start; i < recalBams.size(); ++i) {
|
||||||
|
recalBams[i] = bam_init1();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
recalBams.resize(bams.size());
|
||||||
|
}
|
||||||
|
kt_for(p.numThread, thread_worker, &p, bams.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
static void* pipeApply(void *data) {
|
||||||
|
ApplyPipeArg& p = *(ApplyPipeArg*)data;
|
||||||
|
while(true) {
|
||||||
|
yarn::POSSESS(p.readSig);
|
||||||
|
yarn::WAIT_FOR(p.readSig, yarn::NOT_TO_BE, 0); // 有数据就行
|
||||||
|
yarn::RELEASE(p.readSig);
|
||||||
|
yarn::POSSESS(p.applySig);
|
||||||
|
yarn::WAIT_FOR(p.applySig, yarn::NOT_TO_BE, p.RECAL_BAM_BUF_NUM);
|
||||||
|
yarn::RELEASE(p.applySig);
|
||||||
|
|
||||||
|
if (p.readFinish) {
|
||||||
|
while(p.applyOrder < p.readOrder) {
|
||||||
|
yarn::POSSESS(p.applySig);
|
||||||
|
yarn::WAIT_FOR(p.applySig, yarn::NOT_TO_BE, p.RECAL_BAM_BUF_NUM);
|
||||||
|
yarn::RELEASE(p.applySig);
|
||||||
|
doApply(p);
|
||||||
|
yarn::POSSESS(p.applySig);
|
||||||
|
p.applyOrder += 1;
|
||||||
|
yarn::TWIST(p.applySig, yarn::BY, 1);
|
||||||
|
}
|
||||||
|
yarn::POSSESS(p.applySig);
|
||||||
|
p.applyFinish = 1;
|
||||||
|
yarn::TWIST(p.applySig, yarn::BY, 1);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
doApply(p);
|
||||||
|
yarn::POSSESS(p.readSig);
|
||||||
|
yarn::TWIST(p.readSig, yarn::BY, -1);
|
||||||
|
|
||||||
|
yarn::POSSESS(p.applySig);
|
||||||
|
p.applyOrder += 1;
|
||||||
|
yarn::TWIST(p.applySig, yarn::BY, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static void doWrite(ApplyPipeArg &p) {
|
||||||
|
auto& recalBams = p.applyData[p.writeOrder % p.RECAL_BAM_BUF_NUM];
|
||||||
|
for (int i = 0; i < recalBams.size(); ++i) {
|
||||||
|
if (sam_write1(nsgv::gOutBamFp, nsgv::gOutBamHeader, recalBams[i]) < 0) {
|
||||||
|
spdlog::error("failed writing sam record to \"{}\"", nsgv::gBqsrArg.OUTPUT_FILE.c_str());
|
||||||
|
sam_close(nsgv::gInBamFp);
|
||||||
|
sam_close(nsgv::gOutBamFp);
|
||||||
|
exit(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void *pipeWrite(void *data) {
|
||||||
|
ApplyPipeArg& p = *(ApplyPipeArg*)data;
|
||||||
|
while (true) {
|
||||||
|
yarn::POSSESS(p.applySig);
|
||||||
|
yarn::WAIT_FOR(p.applySig, yarn::NOT_TO_BE, 0); // 有数据就行
|
||||||
|
yarn::RELEASE(p.applySig);
|
||||||
|
|
||||||
|
if (p.applyFinish) {
|
||||||
|
while (p.writeOrder < p.applyOrder) {
|
||||||
|
yarn::POSSESS(p.applySig);
|
||||||
|
yarn::WAIT_FOR(p.applySig, yarn::NOT_TO_BE, 0); // 有数据就行
|
||||||
|
yarn::RELEASE(p.applySig);
|
||||||
|
doWrite(p);
|
||||||
|
p.writeOrder += 1;
|
||||||
|
}
|
||||||
|
break; // pipeline的最后一步,直接退出就行
|
||||||
|
}
|
||||||
|
doWrite(p);
|
||||||
|
p.writeOrder += 1;
|
||||||
|
|
||||||
|
yarn::POSSESS(p.applySig);
|
||||||
|
yarn::TWIST(p.applySig, yarn::BY, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
// 并行 apply bqsr
|
// 并行 apply bqsr
|
||||||
int ParallelApplyBQSR(vector<AuxVar> &auxArr) {
|
int ParallelApplyBQSR(vector<AuxVar> &auxArr) {
|
||||||
|
ApplyPipeArg pipeArg(auxArr);
|
||||||
|
pipeArg.numThread = nsgv::gBqsrArg.NUM_THREADS;
|
||||||
|
pthread_t tidArr[3];
|
||||||
|
pthread_create(&tidArr[0], 0, pipeRead, &pipeArg);
|
||||||
|
pthread_create(&tidArr[1], 0, pipeApply, &pipeArg);
|
||||||
|
pthread_create(&tidArr[2], 0, pipeWrite, &pipeArg);
|
||||||
|
for (int i = 0; i < 3; ++i) pthread_join(tidArr[i], 0);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
@ -279,7 +533,7 @@ static void globalInit() {
|
||||||
if (nsgv::gAuxVars[i].faidx == 0)
|
if (nsgv::gAuxVars[i].faidx == 0)
|
||||||
error("[%s] fail to load the fasta index.\n", __func__);
|
error("[%s] fail to load the fasta index.\n", __func__);
|
||||||
// 注意初始化顺序,这个必须在协变量初始化之后,因为需要用到MaximumKeyValue
|
// 注意初始化顺序,这个必须在协变量初始化之后,因为需要用到MaximumKeyValue
|
||||||
nsgv::gAuxVars[i].bqsrTable = &nsgv::gRecalTables;
|
// nsgv::gAuxVars[i].bqsrTable = &nsgv::gRecalTables;
|
||||||
CovariateUtils::InitPerReadCovMat(nsgv::gAuxVars[i].readCovariates);
|
CovariateUtils::InitPerReadCovMat(nsgv::gAuxVars[i].readCovariates);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -308,7 +562,11 @@ static void globalInit() {
|
||||||
*/
|
*/
|
||||||
|
|
||||||
// 读取并解析recalibrate tables
|
// 读取并解析recalibrate tables
|
||||||
RecalUtils::readRecalTables(nsgv::gBqsrArg.BQSR_FILE, nsgv::gBqsrArg, nsgv::gQuantizedQuals, nsgv::gRecalTables);
|
// RecalUtils::readRecalTables(nsgv::gBqsrArg.BQSR_FILE, nsgv::gBqsrArg, nsgv::gQuantizedQuals, nsgv::gRecalTables);
|
||||||
|
RecalUtils::readRecalTables(nsgv::gBqsrArg.BQSR_FILE, nsgv::gBqsrArg, nsgv::gQuantizedQuals, nsgv::gAuxVars[0].recalTables);
|
||||||
|
for (int i = 1; i < nsgv::gBqsrArg.NUM_THREADS; ++i) {
|
||||||
|
nsgv::gAuxVars[i].recalTables.copy(nsgv::gAuxVars[0].recalTables);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// 全局资源释放
|
// 全局资源释放
|
||||||
|
|
@ -333,10 +591,10 @@ int ApplyBQSR() {
|
||||||
|
|
||||||
PROF_START(GP_whole_process);
|
PROF_START(GP_whole_process);
|
||||||
globalInit();
|
globalInit();
|
||||||
// if (nsgv::gBqsrArg.NUM_THREADS == 1)
|
if (nsgv::gBqsrArg.NUM_THREADS == 1)
|
||||||
ret = SerialApplyBQSR(nsgv::gAuxVars[0]); // 串行处理数据,生成recal bams
|
ret = SerialApplyBQSR(nsgv::gAuxVars[0]); // 串行处理数据,生成recal bams
|
||||||
// else
|
else
|
||||||
// ret = ParallelApplyBQSR(nsgv::gAuxVars); // 并行处理数据,生成recal bams
|
ret = ParallelApplyBQSR(nsgv::gAuxVars); // 并行处理数据,生成recal bams
|
||||||
globalDestroy();
|
globalDestroy();
|
||||||
PROF_GP_END(GP_whole_process);
|
PROF_GP_END(GP_whole_process);
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -52,5 +52,5 @@ struct AuxVar {
|
||||||
StableArray<uint8_t> skips; // 该位置是否是已知位点
|
StableArray<uint8_t> skips; // 该位置是否是已知位点
|
||||||
|
|
||||||
// only for apply bqsr
|
// only for apply bqsr
|
||||||
RecalTables* bqsrTable; // 保留一份就够
|
// RecalTables* bqsrTable; // 保留一份就够,no,保留一份不行,因为empiricalQuality需要实时重建,会写入,每个线程要保留一份
|
||||||
};
|
};
|
||||||
|
|
@ -23,6 +23,10 @@ struct Array2D {
|
||||||
Array2D(int dim1, int dim2) { init(dim1, dim2); }
|
Array2D(int dim1, int dim2) { init(dim1, dim2); }
|
||||||
void init(int dim1, int dim2) { d1 = dim1; d2 = dim2; s1 = dim2; cap = d1 * d2; data.resize(cap); }
|
void init(int dim1, int dim2) { d1 = dim1; d2 = dim2; s1 = dim2; cap = d1 * d2; data.resize(cap); }
|
||||||
inline T& operator()(int k1, int k2) { return data[k1 * s1 + k2]; }
|
inline T& operator()(int k1, int k2) { return data[k1 * s1 + k2]; }
|
||||||
|
void copy(const Array2D<T> &a) {
|
||||||
|
init(a.d1, a.d2);
|
||||||
|
for (int i = 0; i < data.size(); ++i) data[i] = a.data[i];
|
||||||
|
}
|
||||||
#define _Foreach2D(array, valName, codes) \
|
#define _Foreach2D(array, valName, codes) \
|
||||||
for (auto& valName : array.data) { \
|
for (auto& valName : array.data) { \
|
||||||
codes; \
|
codes; \
|
||||||
|
|
@ -56,6 +60,10 @@ struct Array3D {
|
||||||
inline T& operator()(int k1, int k2, int k3) {
|
inline T& operator()(int k1, int k2, int k3) {
|
||||||
return data[k1 * s1 + k2 * s2 + k3];
|
return data[k1 * s1 + k2 * s2 + k3];
|
||||||
}
|
}
|
||||||
|
void copy(const Array3D<T>& a) {
|
||||||
|
init(a.d1, a.d2, a.d3);
|
||||||
|
for (int i = 0; i < data.size(); ++i) data[i] = a.data[i];
|
||||||
|
}
|
||||||
|
|
||||||
#define _Foreach3D(array, valName, codes) \
|
#define _Foreach3D(array, valName, codes) \
|
||||||
for (auto& valName : array.data) { \
|
for (auto& valName : array.data) { \
|
||||||
|
|
@ -89,6 +97,10 @@ struct Array4D {
|
||||||
data.resize(cap);
|
data.resize(cap);
|
||||||
}
|
}
|
||||||
inline T& operator()(int k1, int k2, int k3, int k4) { return data[k1 * s1 + k2 * s2 + k3 * s3 + k4]; }
|
inline T& operator()(int k1, int k2, int k3, int k4) { return data[k1 * s1 + k2 * s2 + k3 * s3 + k4]; }
|
||||||
|
void copy(const Array4D<T>& a) {
|
||||||
|
init(a.d1, a.d2, a.d3, a.d4);
|
||||||
|
for (int i = 0; i < data.size(); ++i) data[i] = a.data[i];
|
||||||
|
}
|
||||||
|
|
||||||
#define _Foreach4D(array, valName, codes) \
|
#define _Foreach4D(array, valName, codes) \
|
||||||
for (auto& valName : array.data) { \
|
for (auto& valName : array.data) { \
|
||||||
|
|
|
||||||
|
|
@ -171,6 +171,16 @@ struct RecalDatum {
|
||||||
return empiricalQuality;
|
return empiricalQuality;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 根据输入参数计算empiricalQual
|
||||||
|
double getCalcEmpiricalQuality(const double priorQualityScore) {
|
||||||
|
const uint64_t mismatches = (uint64_t)(getNumMismatches() + 0.5) + SMOOTHING_CONSTANT; // TODO: why add 0.5?
|
||||||
|
const uint64_t observations = getNumObservations() + SMOOTHING_CONSTANT + SMOOTHING_CONSTANT;
|
||||||
|
|
||||||
|
const int empiricalQual = bayesianEstimateOfEmpiricalQuality(observations, mismatches, priorQualityScore);
|
||||||
|
|
||||||
|
return std::min(empiricalQual, (int)MAX_RECALIBRATED_Q_SCORE);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Calculate and cache the empirical quality score from mismatches and observations (expensive operation)
|
* Calculate and cache the empirical quality score from mismatches and observations (expensive operation)
|
||||||
*/
|
*/
|
||||||
|
|
|
||||||
|
|
@ -299,6 +299,8 @@ struct RecalFuncs {
|
||||||
*/
|
*/
|
||||||
static double hierarchicalBayesianQualityEstimate(double priorQualityScore, RecalDatum& readGroupDatum, RecalDatum& qualityScoreDatum,
|
static double hierarchicalBayesianQualityEstimate(double priorQualityScore, RecalDatum& readGroupDatum, RecalDatum& qualityScoreDatum,
|
||||||
RecalDatum& contextDatum, RecalDatum& cycleDatum) {
|
RecalDatum& contextDatum, RecalDatum& cycleDatum) {
|
||||||
|
|
||||||
|
#if 1
|
||||||
double empiricalQualityForReadGroup =
|
double empiricalQualityForReadGroup =
|
||||||
readGroupDatum.getNumObservations() == 0 ? priorQualityScore : readGroupDatum.getEmpiricalQuality(priorQualityScore);
|
readGroupDatum.getNumObservations() == 0 ? priorQualityScore : readGroupDatum.getEmpiricalQuality(priorQualityScore);
|
||||||
double posteriorEmpiricalQualityForReportedQuality = qualityScoreDatum.getNumObservations() == 0
|
double posteriorEmpiricalQualityForReportedQuality = qualityScoreDatum.getNumObservations() == 0
|
||||||
|
|
@ -317,7 +319,26 @@ struct RecalFuncs {
|
||||||
deltaSpecialCovariates +=
|
deltaSpecialCovariates +=
|
||||||
cycleDatum.getEmpiricalQuality(posteriorEmpiricalQualityForReportedQuality) - posteriorEmpiricalQualityForReportedQuality;
|
cycleDatum.getEmpiricalQuality(posteriorEmpiricalQualityForReportedQuality) - posteriorEmpiricalQualityForReportedQuality;
|
||||||
}
|
}
|
||||||
|
#else
|
||||||
|
double empiricalQualityForReadGroup =
|
||||||
|
readGroupDatum.getNumObservations() == 0 ? priorQualityScore : readGroupDatum.getCalcEmpiricalQuality(priorQualityScore);
|
||||||
|
double posteriorEmpiricalQualityForReportedQuality = qualityScoreDatum.getNumObservations() == 0
|
||||||
|
? empiricalQualityForReadGroup
|
||||||
|
: qualityScoreDatum.getCalcEmpiricalQuality(empiricalQualityForReadGroup);
|
||||||
|
|
||||||
|
double deltaSpecialCovariates = 0.0;
|
||||||
|
// At this point we stop being iterative; the special covariates (context and cycle by default) are treated differently.
|
||||||
|
if (contextDatum.getNumObservations() > 0) {
|
||||||
|
// TODO: the prior is ignored if the empirical quality for the datum is already cached.
|
||||||
|
deltaSpecialCovariates +=
|
||||||
|
contextDatum.getCalcEmpiricalQuality(posteriorEmpiricalQualityForReportedQuality) - posteriorEmpiricalQualityForReportedQuality;
|
||||||
|
}
|
||||||
|
if (cycleDatum.getNumObservations() > 0) {
|
||||||
|
// TODO: the prior is ignored if the empirical quality for the datum is already cached.
|
||||||
|
deltaSpecialCovariates +=
|
||||||
|
cycleDatum.getCalcEmpiricalQuality(posteriorEmpiricalQualityForReportedQuality) - posteriorEmpiricalQualityForReportedQuality;
|
||||||
|
}
|
||||||
|
#endif
|
||||||
return posteriorEmpiricalQualityForReportedQuality + deltaSpecialCovariates;
|
return posteriorEmpiricalQualityForReportedQuality + deltaSpecialCovariates;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -32,6 +32,14 @@ struct RecalTables {
|
||||||
|
|
||||||
RecalTables(int _numReadGroups) { init(_numReadGroups); }
|
RecalTables(int _numReadGroups) { init(_numReadGroups); }
|
||||||
|
|
||||||
|
void copy(const RecalTables &r) {
|
||||||
|
numReadGroups = r.numReadGroups;
|
||||||
|
readGroupTable.copy(r.readGroupTable);
|
||||||
|
qualityScoreTable.copy(r.qualityScoreTable);
|
||||||
|
contextTable.copy(r.contextTable);
|
||||||
|
cycleTable.copy(r.cycleTable);
|
||||||
|
}
|
||||||
|
|
||||||
void init(int _numReadGroups) {
|
void init(int _numReadGroups) {
|
||||||
numReadGroups = _numReadGroups;
|
numReadGroups = _numReadGroups;
|
||||||
// 初始化readgroup和quality两个table
|
// 初始化readgroup和quality两个table
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ struct StableArray {
|
||||||
size_t idx = 0;
|
size_t idx = 0;
|
||||||
void clear() { idx = 0; }
|
void clear() { idx = 0; }
|
||||||
size_t size() const { return idx; }
|
size_t size() const { return idx; }
|
||||||
|
size_t capacity() const { return arr.size(); }
|
||||||
bool empty() { return idx == 0; }
|
bool empty() { return idx == 0; }
|
||||||
void reserve(size_t _size) {
|
void reserve(size_t _size) {
|
||||||
if (arr.size() < _size)
|
if (arr.size() < _size)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue