diff --git a/ext/klib/kthread.c b/ext/klib/kthread.c index b88cd19..6ce7165 100644 --- a/ext/klib/kthread.c +++ b/ext/klib/kthread.c @@ -13,6 +13,11 @@ typedef struct { long i; } ktf_worker_t; +typedef struct { + struct kt_steal_for_t* t; + long i; +} ktf_steal_worker_t; + typedef struct kt_for_t { int n_threads; long n; @@ -21,6 +26,14 @@ typedef struct kt_for_t { void *data; } kt_for_t; +typedef struct kt_steal_for_t { + int n_threads; + long n; + ktf_steal_worker_t* w; + void (*func)(void*, long, int, int); + void* data; +} kt_steal_for_t; + static inline long steal_work(kt_for_t *t) { int i, min_i = -1; @@ -64,6 +77,46 @@ void kt_for(int n_threads, void (*func)(void*,long,int), void *data, long n) } } +static inline long steal_steal_work(kt_steal_for_t* t) { + int i, min_i = -1; + long k, min = LONG_MAX; + for (i = 0; i < t->n_threads; ++i) + if (min > t->w[i].i) + min = t->w[i].i, min_i = i; + k = __sync_fetch_and_add(&t->w[min_i].i, t->n_threads); + return k >= t->n ? -1 : k; +} + +static void* ktf_worker_steal(void* data) { + ktf_steal_worker_t* w = (ktf_steal_worker_t*)data; + long i; + for (;;) { + i = __sync_fetch_and_add(&w->i, w->t->n_threads); + if (i >= w->t->n) + break; + w->t->func(w->t->data, i, w - w->t->w, 0); + } + while ((i = steal_steal_work(w->t)) >= 0) w->t->func(w->t->data, i, w - w->t->w, 1); + pthread_exit(0); +} + +void kt_for_steal(int n_threads, void (*func)(void*, long, int, int), void* data, long n) { + if (n_threads > 1) { + int i; + kt_steal_for_t t; + pthread_t* tid; + t.func = func, t.data = data, t.n_threads = n_threads, t.n = n; + t.w = (ktf_steal_worker_t*)alloca(n_threads * sizeof(ktf_steal_worker_t)); + tid = (pthread_t*)alloca(n_threads * sizeof(pthread_t)); + for (i = 0; i < n_threads; ++i) t.w[i].t = &t, t.w[i].i = i; + for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktf_worker_steal, &t.w[i]); + for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0); + } else { + long j; + for (j = 0; j < n; ++j) func(data, j, 0, 0); + } +} + static void* ktf_worker_no_steal(void* data) { ktf_worker_t* w = (ktf_worker_t*)data; long i; diff --git a/ext/klib/kthread.h b/ext/klib/kthread.h index a0f139e..2e418e4 100644 --- a/ext/klib/kthread.h +++ b/ext/klib/kthread.h @@ -6,6 +6,7 @@ extern "C" { #endif void kt_for(int n_threads, void (*func)(void*,long,int), void *data, long n); +void kt_for_steal(int n_threads, void (*func)(void*, long, int, int), void* data, long n); void kt_for_no_steal(int n_threads, void (*func)(void*, long, int), void* data, long n); void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_data, int n_steps); diff --git a/src/bqsr/aux_arg.h b/src/bqsr/aux_arg.h index a45b60c..1b48a4d 100644 --- a/src/bqsr/aux_arg.h +++ b/src/bqsr/aux_arg.h @@ -28,7 +28,7 @@ struct AuxVar { //const static int REF_CONTEXT_PAD = 3; // 需要做一些填充 //const static int REFERENCE_HALF_WINDOW_LENGTH = 150; // 需要额外多取出一些ref序列,防止边界效应 - static constexpr int BAM_BLOCK_NUM = 1; // 每个线程每次处理1k个bam记录 + static constexpr int BAM_BLOCK_NUM = 1000; // 每个线程每次处理1k个bam记录 static int64_t processedReads; sam_hdr_t* header = nullptr; // bam header diff --git a/src/bqsr/bqsr_entry.cpp b/src/bqsr/bqsr_entry.cpp index b4f774a..5e627cf 100644 --- a/src/bqsr/bqsr_entry.cpp +++ b/src/bqsr/bqsr_entry.cpp @@ -114,44 +114,41 @@ void roundTableValues(RecalTables& rt) { static void printRecalTables(const RecalTables& rt) { _Foreach2D(rt.readGroupTable, val, { if (val.numObservations > 0) { - fprintf(gf[0], "%ld %f %f\n", val.numObservations, val.numMismatches, val.reportedQuality); + fprintf(gf[0], "%ld %f %f\n", val.numObservations, val.getNumMismatches(), val.reportedQuality); } }); _Foreach3D(rt.qualityScoreTable, val, { if (val.numObservations > 0) { - fprintf(gf[1], "%ld %f %f\n", val.numObservations, val.numMismatches, val.reportedQuality); + fprintf(gf[1], "%ld %f %f\n", val.numObservations, val.getNumMismatches(), val.reportedQuality); } }); _Foreach4D(rt.contextTable, val, { if (val.numObservations > 0) { - fprintf(gf[2], "%ld %f %f\n", val.numObservations, val.numMismatches, val.reportedQuality); + fprintf(gf[2], "%ld %f %f\n", val.numObservations, val.getNumMismatches(), val.reportedQuality); } }); _Foreach4D(rt.cycleTable, val, { if (val.numObservations > 0) { - fprintf(gf[3], "%ld %f %f\n", val.numObservations, val.numMismatches, val.reportedQuality); + fprintf(gf[3], "%ld %f %f\n", val.numObservations, val.getNumMismatches(), val.reportedQuality); } }); } // 串行bqsr -int SerialBQSR(AuxVar &aux1) { +int SerialBQSR(AuxVar &aux) { BamBufType inBamBuf(nsgv::gBqsrArg.DUPLEX_IO); inBamBuf.Init(nsgv::gInBamFp, nsgv::gInBamHeader, nsgv::gBqsrArg.MAX_MEM, bqsrReadFilterOut); int64_t readNumSum = 0; int round = 0; - // PerReadCovariateMatrix &readCovariates = aux.readCovariates; - // RecalTables& recalTables = aux.recalTables; - // SamData& sd = aux.sd; - // StableArray&isSNP = aux.isSNP, &isIns = aux.isIns, &isDel = aux.isDel; // 该位置是否是SNP, indel位置,0不是,1是 - // StableArray &baqArray = aux.baqArray; - // StableArray &snpErrors = aux.snpErrors, &insErrors = aux.insErrors, &delErrors = aux.delErrors; - // StableArray &skips = aux.skips; // 该位置是否是已知位点 + PerReadCovariateMatrix &readCovariates = aux.readCovariates; + RecalTables& recalTables = aux.recalTables; + SamData& sd = aux.sd; + StableArray&isSNP = aux.isSNP, &isIns = aux.isIns, &isDel = aux.isDel; // 该位置是否是SNP, indel位置,0不是,1是 + StableArray &baqArray = aux.baqArray; + StableArray &snpErrors = aux.snpErrors, &insErrors = aux.insErrors, &delErrors = aux.delErrors; + StableArray &skips = aux.skips; // 该位置是否是已知位点 - int numProcessed = 0; - int numthreads = 2; - int BLOCK_NUM = AuxVar::BAM_BLOCK_NUM; while (true) { ++round; // 一. 读取bam数据 @@ -164,179 +161,141 @@ int SerialBQSR(AuxVar &aux1) { auto bams = inBamBuf.GetBamArr(); spdlog::info("{} reads processed in {} round", readNum, round); - int numBLocks = (bams.size() + BLOCK_NUM - 1) / BLOCK_NUM; - int blocksPerThread = (numBLocks + numthreads - 1) / numthreads; - int spanBlocks = numthreads * BLOCK_NUM; - // 二. 遍历每个bam(read)记录,进行处理 - for (int j = 0; j < numthreads; ++j) { // 模拟多线程 - AuxVar &aux = nsgv::gAuxVars[j]; - PerReadCovariateMatrix& readCovariates = aux.readCovariates; - RecalTables& recalTables = aux.recalTables; - SamData& sd = aux.sd; - StableArray&isSNP = aux.isSNP, &isIns = aux.isIns, &isDel = aux.isDel; // 该位置是否是SNP, indel位置,0不是,1是 - StableArray& baqArray = aux.baqArray; - StableArray&snpErrors = aux.snpErrors, &insErrors = aux.insErrors, &delErrors = aux.delErrors; - StableArray& skips = aux.skips; // 该位置是否是已知位点 - for (int k = 0; k < blocksPerThread; ++k) - for (int m = 0; m < BLOCK_NUM; ++m) { - int i = j * BLOCK_NUM + k * spanBlocks + m; - if (i >= bams.size()) break; - ++numProcessed; - // for (int i = 0; i < bams.size(); ++i) { - // if (i % 100 == 0) - // spdlog::info("Processing read idx: {}", i); - // 1. - // 对每个read,需要检查cigar是否合法,即没有两个连续的相同的cigar,而且需要将首尾的deletion处理掉,目前看好像没啥影响,我们忽略这一步 - // 2. 对质量分数长度跟碱基长度不匹配的read,缺少的质量分数用默认值补齐,先忽略,后边有需要再处理 - // 3. 如果bam文件之前做过bqsr,tag中包含OQ(originnal - // quality,原始质量分数),检查用户参数里是否指定用原始质量分数进行bqsr,如果是则将质量分数替换为OQ,否则忽略OQ,先忽略 - // spdlog::info("bam idx: {}", i); - BamWrap* bw = bams[i]; - sd.init(); - sd.parseBasic(bw); - sd.rid = i + readNumSum; - if (sd.read_len <= 0) - continue; + for (int i = 0; i < bams.size(); ++i) { + // 1. + // 对每个read,需要检查cigar是否合法,即没有两个连续的相同的cigar,而且需要将首尾的deletion处理掉,目前看好像没啥影响,我们忽略这一步 + // 2. 对质量分数长度跟碱基长度不匹配的read,缺少的质量分数用默认值补齐,先忽略,后边有需要再处理 + // 3. 如果bam文件之前做过bqsr,tag中包含OQ(originnal + // quality,原始质量分数),检查用户参数里是否指定用原始质量分数进行bqsr,如果是则将质量分数替换为OQ,否则忽略OQ,先忽略 + // spdlog::info("bam idx: {}", i); + BamWrap* bw = bams[i]; + sd.init(); + sd.parseBasic(bw); + sd.rid = i + readNumSum; + if (sd.read_len <= 0) + continue; - PROF_START(clip_read); - // 4. 对read的两端进行检测,去除(hardclip)adapter - ReadTransformer::hardClipAdaptorSequence(bw, sd); - if (sd.read_len <= 0) - continue; - // 5. 然后再去除softclip部分 - ReadTransformer::hardClipSoftClippedBases(bw, sd); - if (sd.read_len <= 0) - continue; - // 应用所有的变换,计算samdata的相关信息 - sd.applyTransformations(); - PROF_END(gprof[GP_clip_read], clip_read); + PROF_START(clip_read); + // 4. 对read的两端进行检测,去除(hardclip)adapter + ReadTransformer::hardClipAdaptorSequence(bw, sd); + if (sd.read_len <= 0) + continue; + // 5. 然后再去除softclip部分 + ReadTransformer::hardClipSoftClippedBases(bw, sd); + if (sd.read_len <= 0) + continue; + // 应用所有的变换,计算samdata的相关信息 + sd.applyTransformations(); + PROF_END(gprof[GP_clip_read], clip_read); - // 6. 更新每个read的platform信息,好像没啥用,暂时忽略 - const int nErrors = RecalFuncs::calculateIsSNPOrIndel(aux, sd, isSNP, isIns, isDel); + const char* qname = bam_get_qname(sd.bw->b); + // fprintf(gf[4], "%ld %d %d %d\n", sd.rid, sd.read_len, 1 + BamWrap::bam_pos(sd.start_pos), 1 + BamWrap::bam_pos(sd.end_pos)); + // fprintf(gf[4], "%s %d %d %d %d\n", qname, sd.bw->b->core.flag, sd.read_len, 1 + BamWrap::bam_pos(sd.start_pos), 1 + BamWrap::bam_pos(sd.end_pos)); - /*fprintf(gf[0], "%d\t", sd.read_len); - for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", isSNP[ii]); - fprintf(gf[0], "\n"); - fprintf(gf[1], "%d\t", sd.read_len); - for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[1], "%d ", isIns[ii]); - fprintf(gf[1], "\n"); - fprintf(gf[2], "%d\t", sd.read_len); - for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[2], "%d ", isDel[ii]); - fprintf(gf[2], "\n"); - */ + // 6. 更新每个read的platform信息,好像没啥用,暂时忽略 + const int nErrors = RecalFuncs::calculateIsSNPOrIndel(aux, sd, isSNP, isIns, isDel); - // 7. 计算baqArray - // BAQ = base alignment quality - // note for efficiency reasons we don't compute the BAQ array unless we actually have - // some error to marginalize over. For ILMN data ~85% of reads have no error - // vector baqArray; - bool baqCalculated = false; - if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) { - baqCalculated = BAQ::flatBAQArray(sd, baqArray); - } else { - // baqCalculated = calculateBAQArray(nsgv::gAuxVars[0], baq, sd, baqArray); - } - if (!baqCalculated) - continue; - // 到这里,基本的数据都准备好了,后续就是进行bqsr的统计了 + // fprintf(gf[0], "%d\t", sd.read_len); + // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", isSNP[ii]); + // fprintf(gf[0], "\n"); + // fprintf(gf[1], "%d\t", sd.read_len); + // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[1], "%d ", isIns[ii]); + // fprintf(gf[1], "\n"); + // fprintf(gf[2], "%d\t", sd.read_len); + // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[2], "%d ", isDel[ii]); + // fprintf(gf[2], "\n"); - // 8. 计算这条read对应的协变量 - PROF_START(covariate); - CovariateUtils::ComputeCovariates(sd, aux.header, readCovariates, true); - PROF_END(gprof[GP_covariate], covariate); + // 7. 计算baqArray + // BAQ = base alignment quality + // note for efficiency reasons we don't compute the BAQ array unless we actually have + // some error to marginalize over. For ILMN data ~85% of reads have no error + // vector baqArray; + bool baqCalculated = false; + if (nErrors == 0 || !nsgv::gBqsrArg.enableBAQ) { + baqCalculated = BAQ::flatBAQArray(sd, baqArray); + } else { + // baqCalculated = calculateBAQArray(nsgv::gAuxVars[0], baq, sd, baqArray); + } + if (!baqCalculated) + continue; + // 到这里,基本的数据都准备好了,后续就是进行bqsr的统计了 - // fprintf(gf[3], "%ld %ld %d %ld\n", sd.rid, readCovariates.size(), sd.read_len, readCovariates[0][0].size()); - // for (auto &arr1 : readCovariates) { - // for (size_t si = 0; si < sd.read_len; ++si) { - // for (auto &val : arr1[si]) { - // fprintf(gf[3], "%d ", val); - // } - // } - // } - // fprintf(gf[3], "\n"); + // 8. 计算这条read对应的协变量 + PROF_START(covariate); + CovariateUtils::ComputeCovariates(sd, aux.header, readCovariates, true); + PROF_END(gprof[GP_covariate], covariate); - // fprintf(gf[3], "%ld %d\n", sd.rid, sd.read_len); - // { - // auto& arr1 = readCovariates[0]; - // { - // for (int pos = 0; pos < sd.read_len; ++pos) { - // fprintf(gf[3], "%d %d\n", pos, arr1[pos][2]); - // } - // } - // } - // fprintf(gf[3], "\n"); + // fprintf(gf[4], "%ld %d\n", sd.rid, sd.read_len); + // for (auto &arr1 : readCovariates) { + // for (size_t si = 0; si < sd.read_len; ++si) { + // fprintf(gf[4], "%d %d %d %d ", arr1[si].readGroup, arr1[si].baseQuality, arr1[si].context, arr1[si].cycle); + // } + // } + // fprintf(gf[4], "\n"); - // 9. 计算这条read需要跳过的位置 - PROF_START(read_vcf); - RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips); - for (int ii = 0; ii < sd.read_len; ++ii) { - skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || - sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN; - } - PROF_GP_END(read_vcf); -#if 1 - int fidx = 0; - if (sd.rid % 2 == 0) fidx = 0; - else fidx = 1; - fprintf(gf[fidx], "%ld %d\t", sd.rid, sd.read_len); - for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[fidx], "%d ", skips[ii] ? 1 : 0); - fprintf(gf[fidx], "\n"); + // fprintf(gf[3], "%ld %d\n", sd.rid, sd.read_len); + // { + // auto& arr1 = readCovariates[0]; + // { + // for (int pos = 0; pos < sd.read_len; ++pos) { + // fprintf(gf[3], "%d %d\n", pos, arr1[pos][2]); + // } + // } + // } + // fprintf(gf[3], "\n"); + + // 9. 计算这条read需要跳过的位置 + PROF_START(read_vcf); + RecalFuncs::calculateKnownSites(sd, aux.vcfArr, aux.header, skips); + for (int ii = 0; ii < sd.read_len; ++ii) { + skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || + sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN; + } + PROF_GP_END(read_vcf); +#if 0 + int fidx = 0; + if (sd.rid % 2 == 0) + fidx = 0; + else + fidx = 1; + fprintf(gf[fidx], "%ld %d\t", sd.rid, sd.read_len); + for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[fidx], "%d ", skips[ii] ? 1 : 0); + fprintf(gf[fidx], "\n"); #endif - // fprintf(gf[0], "%ld %d\t", sd.rid, sd.read_len); - // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", skips[ii] ? 1 : 0); - // fprintf(gf[0], "\n"); + // fprintf(gf[0], "%ld %d\t", sd.rid, sd.read_len); + // for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", skips[ii] ? 1 : 0); + // fprintf(gf[0], "\n"); - // 10. 根据BAQ进一步处理snp,indel,得到处理后的数据 - PROF_START(frac_err); - RecalFuncs::calculateFractionalErrorArray(isSNP, baqArray, snpErrors); - RecalFuncs::calculateFractionalErrorArray(isIns, baqArray, insErrors); - RecalFuncs::calculateFractionalErrorArray(isDel, baqArray, delErrors); - PROF_GP_END(frac_err); + // 10. 根据BAQ进一步处理snp,indel,得到处理后的数据 + PROF_START(frac_err); + RecalFuncs::calculateFractionalErrorArray(isSNP, baqArray, snpErrors); + RecalFuncs::calculateFractionalErrorArray(isIns, baqArray, insErrors); + RecalFuncs::calculateFractionalErrorArray(isDel, baqArray, delErrors); + PROF_GP_END(frac_err); - // aggregate all of the info into our info object, and update the data - // 11. 合并之前计算的数据,得到info,并更新bqsr table数据 - ReadRecalInfo info(sd, readCovariates, skips, snpErrors, insErrors, delErrors); + // aggregate all of the info into our info object, and update the data + // 11. 合并之前计算的数据,得到info,并更新bqsr table数据 + ReadRecalInfo info(sd, readCovariates, skips, snpErrors, insErrors, delErrors); - PROF_START(update_info); - RecalUtils::updateRecalTablesForRead(info, recalTables); - PROF_END(gprof[GP_update_info], update_info); - } + PROF_START(update_info); + RecalUtils::updateRecalTablesForRead(info, recalTables); + PROF_END(gprof[GP_update_info], update_info); } readNumSum += readNum; inBamBuf.ClearAll(); // } -#if 0 - printRecalTables(recalTables); -#endif - spdlog::info("read count: {}", readNumSum); - spdlog::info("processed count: {}", numProcessed); - auto& auxArr = nsgv::gAuxVars; - RecalTables& recalTables = auxArr[0].recalTables; - for (int i = 0; i < numthreads; ++i) spdlog::info("thread {} processed reads {}.", i, auxArr[i].threadProcessedReads); - for (int i = 1; i < numthreads; ++i) { - auxArr[0].threadProcessedReads += auxArr[i].threadProcessedReads; - _Foreach3DK(auxArr[i].recalTables.qualityScoreTable, qualDatum, { - if (qualDatum.numObservations > 0) { - recalTables.qualityScoreTable(k1, k2, k3).increment(qualDatum); - } - }); - _Foreach4DK(auxArr[i].recalTables.contextTable, contextDatum, { - if (contextDatum.numObservations > 0) { - recalTables.contextTable(k1, k2, k3, k4).increment(contextDatum); - } - }); - _Foreach4DK(auxArr[i].recalTables.cycleTable, cycleDatum, { - if (cycleDatum.numObservations > 0) { - recalTables.cycleTable(k1, k2, k3, k4).increment(cycleDatum); - } - }); - } + spdlog::info("read count: {}", readNumSum); + // 12. 创建总结数据 collapseQualityScoreTableToReadGroupTable(recalTables.readGroupTable, recalTables.qualityScoreTable); roundTableValues(recalTables); +#if 1 + printRecalTables(recalTables); +#endif + // 13. 量化质量分数 QuantizationInfo quantInfo(recalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS); @@ -347,7 +306,7 @@ int SerialBQSR(AuxVar &aux1) { } // 多线程处理bam数据, tmd是乱序的? -static void thread_worker(void* data, long idx, int tid) { +static void thread_worker(void* data, long idx, int tid, int steal) { AuxVar& aux = (*(vector*)data)[tid]; auto& readCovariates = aux.readCovariates; RecalTables& recalTables = aux.recalTables; @@ -357,7 +316,8 @@ static void thread_worker(void* data, long idx, int tid) { StableArray&snpErrors = aux.snpErrors, &insErrors = aux.insErrors, &delErrors = aux.delErrors; StableArray& skips = aux.skips; // 该位置是否是已知位点 auto &bams = *aux.bamArr; - // for (auto& vcf : aux.vcfArr) vcf.knownSites.clear(); + if (steal) + for (auto& vcf : aux.vcfArr) vcf.knownSites.clear(); #if 1 int startIdx = idx * aux.BAM_BLOCK_NUM; int stopIdx = std::min((size_t)(idx + 1) * aux.BAM_BLOCK_NUM, bams.size()); @@ -404,7 +364,7 @@ static void thread_worker(void* data, long idx, int tid) { skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) || sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN; } // PROF_GP_END(read_vcf); -#if 1 +#if 0 int fidx = 0 + 2 * tid; //if (sd.rid % 2 == 0) fidx = 0 + 2 * tid; //else fidx = 1 + 2 * tid; @@ -447,9 +407,9 @@ int ParallelBQSR(vector& auxArr) { spdlog::info("{} reads processed in {} round", readNum, round); #if 1 - kt_for_no_steal(auxArr.size(), thread_worker, &auxArr, (readNum + AuxVar::BAM_BLOCK_NUM - 1) / AuxVar::BAM_BLOCK_NUM); + kt_for_steal(auxArr.size(), thread_worker, &auxArr, (readNum + AuxVar::BAM_BLOCK_NUM - 1) / AuxVar::BAM_BLOCK_NUM); #else - kt_for(auxArr.size(), thread_worker, &auxArr, auxArr.size()); + kt_for_steal(auxArr.size(), thread_worker, &auxArr, auxArr.size()); #endif readNumSum += readNum; AuxVar::processedReads += readNum; @@ -457,9 +417,9 @@ int ParallelBQSR(vector& auxArr) { } spdlog::info("read count: {}", readNumSum); - // 合并各个线程的结果 RecalTables& recalTables = auxArr[0].recalTables; + // printRecalTables(recalTables); for (int i = 0; i < auxArr.size(); ++i) spdlog::info("thread {} processed reads {}.", i, auxArr[i].threadProcessedReads); for (int i = 1; i < auxArr.size(); ++i) { @@ -481,13 +441,13 @@ int ParallelBQSR(vector& auxArr) { }); } spdlog::info("All processed reads {}.", auxArr[0].threadProcessedReads); - - // printRecalTables(recalTables); // 创建总结数据 collapseQualityScoreTableToReadGroupTable(recalTables.readGroupTable, recalTables.qualityScoreTable); roundTableValues(recalTables); + printRecalTables(recalTables); + // 量化质量分数 QuantizationInfo quantInfo(recalTables, nsgv::gBqsrArg.QUANTIZING_LEVELS); @@ -583,9 +543,9 @@ int BaseRecalibrator() { PROF_START(whole_process); globalInit(); - //if (nsgv::gBqsrArg.NUM_THREADS == 1) - // ret = SerialBQSR(nsgv::gAuxVars[0]); // 串行处理数据,生成recal table - //else + if (nsgv::gBqsrArg.NUM_THREADS == 1) + ret = SerialBQSR(nsgv::gAuxVars[0]); // 串行处理数据,生成recal table + else ret = ParallelBQSR(nsgv::gAuxVars); // 并行处理数据,生成recal table globalDestroy(); sam_close(nsgv::gInBamFp); diff --git a/src/bqsr/recal_funcs.h b/src/bqsr/recal_funcs.h index d586591..c5808c8 100644 --- a/src/bqsr/recal_funcs.h +++ b/src/bqsr/recal_funcs.h @@ -110,25 +110,21 @@ struct RecalFuncs { int64_t startPos = bw->start_pos(); // 闭区间 int64_t endPos = bw->end_pos(); // 闭区间 knownSites.resize_fill(sd.read_len, 0); - // return; // update vcfs for (auto& vcf : vcfs) { -#if 1 + if (!vcf.knownSites.empty() && vcf.knownSites.back().right < startPos) + vcf.knownSites.clear(); // 清理旧的interval while (!vcf.knownSites.empty()) { auto& intv = vcf.knownSites.front(); - // spdlog::info("intv bam {}, {}", intv.right, startPos); if (intv.right < startPos) vcf.knownSites.pop_front(); else break; } -// #endif if (!vcf.knownSites.empty() && vcf.knownSites.back().left > endPos) // 此时vcf的区域包含bam,不需要读取 continue; -#endif - vcf.knownSites.clear(); // 读取新的interval int64_t fpos, flen; endPos = std::max(startPos + MAX_SITES_INTERVAL, endPos); @@ -155,7 +151,7 @@ struct RecalFuncs { tid = sam_hdr_name2tid(samHdr, stid.c_str()); int64_t varStart = BamWrap::bam_global_pos(tid, pos); Interval varIntv(varStart, varStart + ref.size() - 1); - if (varIntv.right >= readIntv.left) { + if (varIntv.overlaps(readIntv)) { vcf.knownSites.push_back(Interval(tid, pos - 1, pos - 1 + ref.size() - 1)); // 闭区间 } get_line_from_buf(buf, flen, &cur, &line); @@ -170,12 +166,12 @@ struct RecalFuncs { if (intv.left > sd.softEnd()) break; // 计算对应的位点 - ReadIdxCigar rcStart = sd.getReadIndexForReferenceCoordinate(intv.left); + ReadIdxCigar rcStart = sd.getReadIndexForReferenceCoordinate(intv.contigLeft()); int featureStartOnRead = rcStart.readIdx == SamData::READ_INDEX_NOT_FOUND ? 0 : rcStart.readIdx; if (rcStart.cigarOp == 'D') { --featureStartOnRead; } - ReadIdxCigar rcEnd = sd.getReadIndexForReferenceCoordinate(intv.right); + ReadIdxCigar rcEnd = sd.getReadIndexForReferenceCoordinate(intv.contigRight()); int featureEndOnRead = rcEnd.readIdx == SamData::READ_INDEX_NOT_FOUND ? sd.read_len : rcEnd.readIdx; if (featureStartOnRead > sd.read_len) { featureStartOnRead = featureEndOnRead = sd.read_len; diff --git a/src/bqsr/recal_utils.h b/src/bqsr/recal_utils.h index d925388..e9539ab 100644 --- a/src/bqsr/recal_utils.h +++ b/src/bqsr/recal_utils.h @@ -21,6 +21,7 @@ #include "util/utils.h" #include "covariate.h" #include "read_recal_info.h" +#include "util/debug.h" struct RecalUtils { @@ -63,6 +64,7 @@ struct RecalUtils { CovariateValues& cv = readCovars[event.index][offset]; uint8_t qual = info.getQual(event, offset); double isError = info.getErrorFraction(event, offset); + // fprintf(gf[4], "%d %d %f\n", offset, qual, isError); // 处理quality score covariate qualityScoreTable(event.index, cv.readGroup, cv.baseQuality).increment(1, isError, cv.baseQuality); @@ -77,16 +79,12 @@ struct RecalUtils { } } } - // fprintf(gf[3], "%ld %d %ld\n", read.rid, read.read_len, read.start_pos+1); - // for (auto& arr1 : qualityScoreTable.data) { - // for (size_t si = 0; si < arr1.size(); ++si) { - // for (auto &val : arr1[si]) { - // if (val.numObservations > 0) - // fprintf(gf[3], "%ld %f %f ", val.numObservations, val.getNumMismatches(), val.reportedQuality); - // } - // } - // } - // fprintf(gf[3], "\n"); + // fprintf(gf[4], "%ld %d %ld\n", read.rid, read.read_len, read.start_pos+1); + // _Foreach3D(qualityScoreTable, val, { + // if (val.numObservations > 0) + // fprintf(gf[4], "%ld %f %f ", val.numObservations, val.getNumMismatches(), val.reportedQuality); + // }); + // fprintf(gf[4], "\n"); // fprintf(gf[3], "%ld %d %ld\n", read.rid, read.read_len, read.start_pos+1); // for (auto& arr1 : contextTable.data) { diff --git a/src/util/bam_buf.cpp b/src/util/bam_buf.cpp index c43cc59..54a9149 100644 --- a/src/util/bam_buf.cpp +++ b/src/util/bam_buf.cpp @@ -17,8 +17,10 @@ int BamBuf::ReadBam() { int read_num = 0; if (handle_last) { // 处理上次读入的最后一个bam if (has_enough_space()) { // 必须调用,在边界处调整memffset - ++read_num; - append_one_bam(); + if (filter_out == nullptr || !filter_out(bw->b)) { // 这里也要加过滤器 + ++read_num; + append_one_bam(); + } } else { return read_num; // 还是没空间 } diff --git a/src/util/debug.cpp b/src/util/debug.cpp index c7b6e8b..51ecf2e 100644 --- a/src/util/debug.cpp +++ b/src/util/debug.cpp @@ -9,12 +9,12 @@ #include -FILE* gf[4] = {0}; +FILE* gf[DEBUG_FILE_NUM] = {0}; int open_debug_files() { char fn[1024] = {0}; int i = 0; - for (; i < 4; ++i) { + for (; i < DEBUG_FILE_NUM; ++i) { sprintf(fn, "./test/fp%d.txt", i); gf[i] = fopen(fn, "w"); } @@ -23,7 +23,7 @@ int open_debug_files() { int close_debug_files() { int i = 0; - for (; i < 4; ++i) { + for (; i < DEBUG_FILE_NUM; ++i) { if (gf[i] != 0) fclose(gf[i]); } diff --git a/src/util/debug.h b/src/util/debug.h index 106be74..5f875fe 100644 --- a/src/util/debug.h +++ b/src/util/debug.h @@ -20,7 +20,9 @@ //////////////////////////////////////////////////////////////// -extern FILE *gf[4]; +#define DEBUG_FILE_NUM 5 + +extern FILE* gf[DEBUG_FILE_NUM]; int open_debug_files(); diff --git a/src/util/sam_data.h b/src/util/sam_data.h index eb10ae8..9d6dc2c 100644 --- a/src/util/sam_data.h +++ b/src/util/sam_data.h @@ -100,6 +100,16 @@ struct SamData { const bam1_core_t& bc = bw->b->core; int i = 0; + // 先更新一下cigar信息,去除两端无效(长度为0)的cigar + if (bam_cigar_oplen(cigar[cigar_start]) == first_cigar_clip) { + first_cigar_clip = 0; + ++cigar_start; + } + if (bam_cigar_oplen(cigar[cigar_end - 1]) == last_cigar_clip) { + last_cigar_clip = 0; + --cigar_end; + } + // 计算ref_offset,就是相对比对的position,要将ref右移多少 for (i = 0; i < cigar_start; ++i) { if (BaseUtils::consumeRefBases(bam_cigar_opchr(cigar[i]))) { @@ -121,7 +131,7 @@ struct SamData { } // 计算read两端clip之后的softstart和softend,其实S之前都被切掉了 - int64_t softStart = bw->b->core.pos + ref_offset; + int64_t softStart = BamWrap::bam_global_pos(bw->b) + ref_offset; // 注意这里,要加上contig int64_t softEnd = softStart - 1; // 闭区间 bool rightTail = false; for (i = cigar_start; i < cigar_end; ++i) { @@ -150,7 +160,7 @@ struct SamData { start_pos += len; // 更新起始位置 } else if (i == cigar_end - 1 && c == 'D') { // 跳过结尾的deletion c = 'H'; - softEnd -= len; // 更新结束位置 + end_pos -= len; // 更新结束位置 } cigars.push_back({c, len}); } @@ -159,12 +169,13 @@ struct SamData { // 给定一个ref pos,返回对应的read index和cigar操作 ReadIdxCigar getReadIndexForReferenceCoordinate(int64_t refPos) { ReadIdxCigar rc; - if (refPos < start_pos) + int64_t contig_start_pos = BamWrap::bam_pos(start_pos); + if (refPos < contig_start_pos) return rc; int firstReadPosOfElement = 0; // inclusive - int firstRefPosOfElement = start_pos; // inclusive + int firstRefPosOfElement = contig_start_pos; // inclusive int lastReadPosOfElement = 0; // exclusive - int lastRefPosOfElement = start_pos; // exclusive + int lastRefPosOfElement = contig_start_pos; // exclusive // advance forward through all the cigar elements until we bracket the reference coordinate for (auto& cigar : cigars) { firstReadPosOfElement = lastReadPosOfElement;