又解决了一个bug,忽略了有些read的过滤,把过滤函数放到append_one_bam里就好了,现在发现串行和并行结果还是有点不一致,正在调试
This commit is contained in:
parent
f915461205
commit
81cbd6831c
|
|
@ -252,17 +252,15 @@ int SerialBQSR(AuxVar &aux) {
|
||||||
skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) ||
|
skips[ii] = skips[ii] || (ContextCovariate::baseIndexMap[sd.bases[ii]] == -1) ||
|
||||||
sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN;
|
sd.base_quals[ii] < nsgv::gBqsrArg.PRESERVE_QSCORES_LESS_THAN;
|
||||||
}
|
}
|
||||||
|
//stringstream ss;
|
||||||
|
//for (auto s : skips) ss << (int)s << ' ';
|
||||||
|
//spdlog::info("{}", ss.str());
|
||||||
PROF_GP_END(read_vcf);
|
PROF_GP_END(read_vcf);
|
||||||
#if 0
|
|
||||||
int fidx = 0;
|
// fprintf(gf[4], "%s %d %ld ", bam_get_qname(sd.bw->b), sd.bw->b->core.flag, sd.rid);
|
||||||
if (sd.rid % 2 == 0)
|
// for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[4], "%d ", skips[ii] ? 1 : 0);
|
||||||
fidx = 0;
|
// fprintf(gf[4], "\n");
|
||||||
else
|
|
||||||
fidx = 1;
|
|
||||||
fprintf(gf[fidx], "%ld %d\t", sd.rid, sd.read_len);
|
|
||||||
for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[fidx], "%d ", skips[ii] ? 1 : 0);
|
|
||||||
fprintf(gf[fidx], "\n");
|
|
||||||
#endif
|
|
||||||
// fprintf(gf[0], "%ld %d\t", sd.rid, sd.read_len);
|
// fprintf(gf[0], "%ld %d\t", sd.rid, sd.read_len);
|
||||||
// for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", skips[ii] ? 1 : 0);
|
// for (int ii = 0; ii < sd.read_len; ++ii) fprintf(gf[0], "%d ", skips[ii] ? 1 : 0);
|
||||||
// fprintf(gf[0], "\n");
|
// fprintf(gf[0], "\n");
|
||||||
|
|
@ -306,7 +304,8 @@ int SerialBQSR(AuxVar &aux) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 多线程处理bam数据, tmd是乱序的?
|
// 多线程处理bam数据, tmd是乱序的?
|
||||||
static void thread_worker(void* data, long idx, int tid, int steal) {
|
// static void thread_worker(void* data, long idx, int tid, int steal) {
|
||||||
|
static void thread_worker(void* data, long idx, int tid) {
|
||||||
AuxVar& aux = (*(vector<AuxVar>*)data)[tid];
|
AuxVar& aux = (*(vector<AuxVar>*)data)[tid];
|
||||||
auto& readCovariates = aux.readCovariates;
|
auto& readCovariates = aux.readCovariates;
|
||||||
RecalTables& recalTables = aux.recalTables;
|
RecalTables& recalTables = aux.recalTables;
|
||||||
|
|
@ -316,8 +315,7 @@ static void thread_worker(void* data, long idx, int tid, int steal) {
|
||||||
StableArray<double>&snpErrors = aux.snpErrors, &insErrors = aux.insErrors, &delErrors = aux.delErrors;
|
StableArray<double>&snpErrors = aux.snpErrors, &insErrors = aux.insErrors, &delErrors = aux.delErrors;
|
||||||
StableArray<uint8_t>& skips = aux.skips; // 该位置是否是已知位点
|
StableArray<uint8_t>& skips = aux.skips; // 该位置是否是已知位点
|
||||||
auto &bams = *aux.bamArr;
|
auto &bams = *aux.bamArr;
|
||||||
if (steal)
|
// if (steal) for (auto& vcf : aux.vcfArr) vcf.knownSites.clear();
|
||||||
for (auto& vcf : aux.vcfArr) vcf.knownSites.clear();
|
|
||||||
#if 1
|
#if 1
|
||||||
int startIdx = idx * aux.BAM_BLOCK_NUM;
|
int startIdx = idx * aux.BAM_BLOCK_NUM;
|
||||||
int stopIdx = std::min((size_t)(idx + 1) * aux.BAM_BLOCK_NUM, bams.size());
|
int stopIdx = std::min((size_t)(idx + 1) * aux.BAM_BLOCK_NUM, bams.size());
|
||||||
|
|
@ -407,7 +405,7 @@ int ParallelBQSR(vector<AuxVar>& auxArr) {
|
||||||
spdlog::info("{} reads processed in {} round", readNum, round);
|
spdlog::info("{} reads processed in {} round", readNum, round);
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
kt_for_steal(auxArr.size(), thread_worker, &auxArr, (readNum + AuxVar::BAM_BLOCK_NUM - 1) / AuxVar::BAM_BLOCK_NUM);
|
kt_for_no_steal(auxArr.size(), thread_worker, &auxArr, (readNum + AuxVar::BAM_BLOCK_NUM - 1) / AuxVar::BAM_BLOCK_NUM);
|
||||||
#else
|
#else
|
||||||
kt_for_steal(auxArr.size(), thread_worker, &auxArr, auxArr.size());
|
kt_for_steal(auxArr.size(), thread_worker, &auxArr, auxArr.size());
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
|
|
@ -18,8 +18,7 @@ int BamBuf::ReadBam() {
|
||||||
if (handle_last) { // 处理上次读入的最后一个bam
|
if (handle_last) { // 处理上次读入的最后一个bam
|
||||||
if (has_enough_space()) { // 必须调用,在边界处调整memffset
|
if (has_enough_space()) { // 必须调用,在边界处调整memffset
|
||||||
if (filter_out == nullptr || !filter_out(bw->b)) { // 这里也要加过滤器
|
if (filter_out == nullptr || !filter_out(bw->b)) { // 这里也要加过滤器
|
||||||
++read_num;
|
read_num += append_one_bam();
|
||||||
append_one_bam();
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return read_num; // 还是没空间
|
return read_num; // 还是没空间
|
||||||
|
|
@ -28,11 +27,9 @@ int BamBuf::ReadBam() {
|
||||||
while (read_stat_ >= 0 && (read_stat_ = sam_read1(fp, hdr, bw->b)) >= 0) {
|
while (read_stat_ >= 0 && (read_stat_ = sam_read1(fp, hdr, bw->b)) >= 0) {
|
||||||
bw->end_pos_ = BamWrap::BamEndPos(bw->b);
|
bw->end_pos_ = BamWrap::BamEndPos(bw->b);
|
||||||
if (has_enough_space()) { // 还有空间
|
if (has_enough_space()) { // 还有空间
|
||||||
// if (true) { // 还有空间
|
|
||||||
// 加过滤器
|
// 加过滤器
|
||||||
if (filter_out == nullptr || !filter_out(bw->b)) {
|
if (filter_out == nullptr || !filter_out(bw->b)) {
|
||||||
append_one_bam();
|
read_num += append_one_bam(); // 放进缓存才算读取到
|
||||||
++read_num; // 放进缓存才算读取到
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
|
@ -110,8 +107,10 @@ inline bool BamBuf::has_enough_space() {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 处理一个读取后的bam
|
// 处理一个读取后的bam
|
||||||
inline void BamBuf::append_one_bam() {
|
inline int BamBuf::append_one_bam() {
|
||||||
BamWrap *bwp = (BamWrap *)(mem + mem_offset);
|
if (filter_out != nullptr && filter_out(bw->b))
|
||||||
|
return 0;
|
||||||
|
BamWrap* bwp = (BamWrap*)(mem + mem_offset);
|
||||||
*bwp = *bw;
|
*bwp = *bw;
|
||||||
bwp->b = (bam1_t *)((char *)bwp + sizeof(*bwp));
|
bwp->b = (bam1_t *)((char *)bwp + sizeof(*bwp));
|
||||||
bam1_t *bp = bwp->b;
|
bam1_t *bp = bwp->b;
|
||||||
|
|
@ -121,6 +120,7 @@ inline void BamBuf::append_one_bam() {
|
||||||
// 更新下次存储的位置
|
// 更新下次存储的位置
|
||||||
mem_offset = (mem_offset + bw->length() + 8 - 1) & ~((size_t)(8 - 1));
|
mem_offset = (mem_offset + bw->length() + 8 - 1) & ~((size_t)(8 - 1));
|
||||||
bv.push_back(bwp);
|
bv.push_back(bwp);
|
||||||
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 处理上次读入的最后一个read
|
// 处理上次读入的最后一个read
|
||||||
|
|
|
||||||
|
|
@ -85,7 +85,7 @@ struct BamBuf {
|
||||||
// 检查缓存是否还有空间
|
// 检查缓存是否还有空间
|
||||||
bool has_enough_space();
|
bool has_enough_space();
|
||||||
// 处理一个读取后的bam
|
// 处理一个读取后的bam
|
||||||
void append_one_bam();
|
int append_one_bam();
|
||||||
// 处理上次读入的最后一个read
|
// 处理上次读入的最后一个read
|
||||||
bool handle_last_read();
|
bool handle_last_read();
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue