#include "paired_sam.h" #include #include #include "ksw_align_avx.h" #include "kvec.h" // 原版生成sam函数 static void worker_sam(void* data, int i, int tid) { mem_worker_t* w = (mem_worker_t*)data; if (bwa_verbose >= 4) printf("=====> Finalizing read pair '%s' <=====\n", w->seqs[i << 1 | 0].name); mem_sam_pe(w->opt, w->bns, w->pac, w->pes, (w->n_processed >> 1) + i, &w->seqs[i << 1], &w->regs[i << 1], &w->sams[i << 1], tid); free(w->regs[i << 1 | 0].a); free(w->regs[i << 1 | 1].a); } // 应该是推测read的方向,正链还是反向互补等等 static inline int mem_infer_dir(int64_t l_pac, int64_t b1, int64_t b2, int64_t* dist) { int64_t p2; int r1 = (b1 >= l_pac), r2 = (b2 >= l_pac); p2 = r1 == r2 ? b2 : (l_pac << 1) - 1 - b2; // p2 is the coordinate of read 2 on the read 1 strand *dist = p2 > b1 ? p2 - b1 : b1 - p2; return (r1 == r2 ? 0 : 1) ^ (p2 > b1 ? 0 : 3); } // 根据ref的begin和end计算对应的rid和修正之后的end static int get_rid_range(const bntseq_t* bns, const uint8_t* pac, int64_t* beg, int64_t mid, int64_t* end) { int64_t far_beg, far_end; int is_rev; int rid; if (*end < *beg) *end ^= *beg, *beg ^= *end, *end ^= *beg; // if end is smaller, swap assert(*beg <= mid && mid < *end); rid = bns_pos2rid(bns, bns_depos(bns, mid, &is_rev)); far_beg = bns->anns[rid].offset; far_end = far_beg + bns->anns[rid].len; if (is_rev) { // flip to the reverse strand int64_t tmp = far_beg; far_beg = (bns->l_pac << 1) - far_end; far_end = (bns->l_pac << 1) - tmp; } *beg = *beg > far_beg ? *beg : far_beg; *end = *end < far_end ? *end : far_end; return rid; } // 获取ref序列 static inline uint8_t* get_ref_seq(const bntseq_t* bns, const uint8_t* pac, int64_t beg, int64_t end) { int64_t len; return bns_get_seq(bns->l_pac, pac, beg, end, &len); } // 计算当前seq是否需要做matesw,需要的话保存必要的数据 static void get_matesw_data(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_pestat_t pes[4], const mem_alnreg_t* a, bseq1_t* seq, mem_alnreg_v* ma, int64_t sid, mem_stats_t *stats, matesw_data_v* msw8, matesw_data_v* msw16) { int64_t l_pac = bns->l_pac; int l_ms = seq->l_seq; int i, r, skip[4], rid; stats->max_seq_len = max_(stats->max_seq_len, l_ms); for (r = 0; r < 4; ++r) skip[r] = pes[r].failed ? 1 : 0; for (i = 0; i < ma->n; ++i) { // check which orinentation has been found int64_t dist; r = mem_infer_dir(l_pac, a->rb, ma->a[i].rb, &dist); if (dist >= pes[r].low && dist <= pes[r].high) skip[r] = 1; } if (skip[0] + skip[1] + skip[2] + skip[3] == 4) return; // consistent pair exist; no need to perform SW for (r = 0; r < 4; ++r) { int is_rev, is_larger; int64_t rb, re; // 左闭右开 if (skip[r]) continue; is_rev = (r >> 1 != (r & 1)); // whether to reverse complement the mate is_larger = !(r >> 1); // whether the mate has larger coordinate if (!is_rev) { rb = is_larger ? a->rb + pes[r].low : a->rb - pes[r].high; // if on the same strand, end position should be larger to make room for the seq length re = (is_larger ? a->rb + pes[r].high : a->rb - pes[r].low) + l_ms; } else { rb = (is_larger ? a->rb + pes[r].low : a->rb - pes[r].high) - l_ms; // similarly on opposite strands re = is_larger ? a->rb + pes[r].high : a->rb - pes[r].low; } if (rb < 0) rb = 0; if (re > l_pac << 1) re = l_pac << 1; rid = get_rid_range(bns, pac, &rb, (rb + re) >> 1, &re); // 计算ref对应的染色体id和区间起始终止位置 if (a->rid == rid && re - rb >= opt->min_seed_len) { // no funny things happening stats->max_msw_ref_len = max_(stats->max_msw_ref_len, re - rb); // fprintf(stderr, "zzh here\n"); int xtra = KSW_XSUBO | KSW_XSTART | (l_ms * opt->a < 250 ? KSW_XBYTE : 0) | (opt->min_seed_len * opt->a); matesw_data_t* p; if ((xtra & KSW_XBYTE)) p = kv_pushp(matesw_data_t, *msw8); else p = kv_pushp(matesw_data_t, *msw16); p->is_rev = is_rev; p->xtra = xtra; p->rb = rb; p->re = re; p->seq_id = sid; kv_push(matesw_data_t*, seq->msw, p); // 将matesw任务和对应的seq关联起来 } } } // 先计算哪些read需要做matesw static void worker_matesw_data(void* data, int idx, int tid) { mem_worker_t* w = (mem_worker_t*)data; const mem_opt_t* opt = w->opt; int64_t i_s = idx << 1; bseq1_t* s = &w->seqs[i_s]; mem_alnreg_v* a = &w->regs[i_s]; s->msw.n = 0; // 清空之前的matesw数据 int i, j; mem_alnreg_v b[2]; kv_init(b[0]); kv_init(b[1]); // fprintf(stderr, "zzh test\n"); for (i = 0; i < 2; ++i) for (j = 0; j < a[i].n; ++j) if (a[i].a[j].score >= a[i].a[0].score - opt->pen_unpaired) kv_push(mem_alnreg_t, b[i], a[i].a[j]); for (i = 0; i < 2; ++i) for (j = 0; j < b[i].n && j < opt->max_matesw; ++j) get_matesw_data(opt, w->bns, w->pac, w->pes, &b[i].a[j], &s[!i], &a[!i], i_s + !i, &w->mem_stats[tid], w->matesw_arr8[tid], w->matesw_arr16[tid]); free(b[0].a); free(b[1].a); } // 再多线程计算matesw,利用inter-query的simd并行 static void worker_calc_matesw_avx512_u8(void* data, int idx, int tid) { mem_worker_t* w = (mem_worker_t*)data; int startIdx = idx * SIMD512_WIDTH8; int endIdx = (idx + 1) * SIMD512_WIDTH8; if (endIdx > w->msw_tasks8.n) endIdx = w->msw_tasks8.n; int i = 0, j = 0, k = 0; int maxSeqLen = 0, maxRefLen = 0; uint8_t* refArr[SIMD512_WIDTH8] = {0}; uint8_t* seqArr[SIMD512_WIDTH8] = {0}; int refLen[SIMD512_WIDTH8] = {0}; int seqLen[SIMD512_WIDTH8] = {0}; int seqQuantaLen[SIMD512_WIDTH8] = {0}; int xtraA[SIMD512_WIDTH8] = {0}; kswr_avx_t alns[SIMD512_WIDTH8] = {0}; matesw_buf_t* msw_buf = &w->msw_buf[tid]; // 缓冲区 for (i = startIdx, j = 0; i < endIdx; ++i, ++j) { matesw_data_t* task = kv_A(w->msw_tasks8, i); // 1. 获取对应的ref refArr[j] = get_ref_seq(w->bns, w->pac, task->rb, task->re); refLen[j] = task->re - task->rb; maxRefLen = max_(maxRefLen, refLen[j]); // 2. 获取对应的seq seqArr[j] = (uint8_t*)w->seqs[task->seq_id].seq; seqLen[j] = w->seqs[task->seq_id].l_seq; int quanta = (seqLen[j] + 16 - 1) / 16; // based on SSE-8 bit lane quanta *= 16; seqQuantaLen[j] = quanta; maxSeqLen = max_(maxSeqLen, quanta); // 3. 其他数据 xtraA[j] = task->xtra; } // 如果不到64个,补全剩下的 for (; j < SIMD512_WIDTH8; ++j) { refArr[j] = refArr[0]; refLen[j] = refLen[0]; seqArr[j] = seqArr[0]; seqLen[j] = seqLen[0]; xtraA[j] = xtraA[0]; } // 将ref和seq赋值给对应的用来计算的缓存 uint8_t* mySeq1SoA = msw_buf->refArr; uint8_t* mySeq2SoA = msw_buf->seqArr; for (j = 0; j < SIMD512_WIDTH8; ++j) { uint8_t* seq1 = refArr[j]; uint8_t* seq2 = seqArr[j]; // 处理ref for (k = 0; k < refLen[j]; ++k) { mySeq1SoA[k * SIMD512_WIDTH8 + j] = (seq1[k] == AMBIG_ ? AMBR : seq1[k]); } for (; k < maxRefLen; ++k) { mySeq1SoA[k * SIMD512_WIDTH8 + j] = 0xFF; } // 处理seq for (k = 0; k < seqLen[j]; ++k) { mySeq2SoA[k * SIMD512_WIDTH8 + j] = (seq2[k] == AMBIG_ ? AMBR : seq2[k]); } for (; k < seqQuantaLen[j]; ++k) { mySeq2SoA[k * SIMD512_WIDTH8 + j] = DUMMY5; } for (; k < maxSeqLen; ++k) { mySeq2SoA[k * SIMD512_WIDTH8 + j] = 0xFF; } } const mem_opt_t* opt = w->opt; // 利用smid指令计算 ksw_align_avx512_u8(opt->a, -1 * opt->b, opt->o_ins, opt->e_ins, opt->o_del, opt->e_del, msw_buf, mySeq1SoA, mySeq2SoA, maxRefLen, maxSeqLen, xtraA, refLen, alns, 0); // 保存结果 for (i = startIdx, j = 0; i < endIdx; ++i, ++j) { matesw_data_t* task = kv_A(w->msw_tasks8, i); task->aln = alns[j]; free(refArr[j]); } } static void worker_calc_matesw16(void* data, int i, int tid) {} // 最后再计算并生成sam数据 static void workder_gen_sam(void* data, int i, int tid) { mem_worker_t* w = (mem_worker_t*)data; free(w->regs[i << 1 | 0].a); free(w->regs[i << 1 | 1].a); } // 划分matesw任务 static void gather_matesw_task(mem_worker_t* w, matesw_data_v** msw_arr, matesw_ptr_v* tasks) { tasks->n = 0; int i = 0, j = 0; for (i = 0; i < w->opt->n_threads; ++i) { for (j = 0; j < msw_arr[i]->n; ++j) kv_push(matesw_data_t*, *tasks, &kv_A(*msw_arr[i], j)); } } // 更新stats static void update_mem_stats(mem_worker_t* w) { int i = 0, max_seq_len = 0, max_ref_len = 0; for (i = 0; i < w->opt->n_threads; ++i) { max_seq_len = max_(max_seq_len, w->mem_stats[i].max_seq_len); max_ref_len = max_(max_ref_len, w->mem_stats[i].max_msw_ref_len); } int quanta = (max_seq_len + 16 - 1) / 16; // based on SSE-8 bit lane quanta *= 16; max_seq_len = quanta + 1; for (i = 0; i < w->opt->n_threads; ++i) { w->mem_stats[i].max_seq_len = max_seq_len; w->mem_stats[i].max_msw_ref_len = max_ref_len; } } // 开辟缓冲区 static void alloc_update_cache_avx512(mem_worker_t* w) { int i = 0; for (i = 0; i < w->opt->n_threads; ++i) { matesw_buf_t* b = &w->msw_buf[i]; // 更新跟ref len有关的缓冲区 if (b->ref_len < w->mem_stats[i].max_msw_ref_len) { b->ref_len = w->mem_stats[i].max_msw_ref_len; if (b->refArr) _mm_free(b->refArr); if (b->rowMax) _mm_free(b->rowMax); b->refArr = (uint8_t*)_mm_malloc(b->ref_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64); b->rowMax = (uint8_t*)_mm_malloc(b->ref_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64); } // 更新跟seq len有关的缓冲区 if (b->seq_len < w->mem_stats[i].max_seq_len) { b->seq_len = w->mem_stats[i].max_seq_len; if (b->seqArr) _mm_free(b->seqArr); if (b->H0) _mm_free(b->H0); if (b->H1) _mm_free(b->H1); if (b->Hmax) _mm_free(b->Hmax); if (b->F) _mm_free(b->F); b->seqArr = (uint8_t*)_mm_malloc(b->seq_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64); b->H0 = (uint8_t*)_mm_malloc(b->seq_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64); b->H1 = (uint8_t*)_mm_malloc(b->seq_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64); b->Hmax = (uint8_t*)_mm_malloc(b->seq_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64); b->F = (uint8_t*)_mm_malloc(b->seq_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64); } } } // 针对pair end数据,生成sam的过程 void gen_paired_sam(mem_worker_t* w) { int i = 0; for (i = 0; i < w->opt->n_threads; ++i) { w->matesw_arr8[i]->n = 0; // 清空之前的数据 w->matesw_arr16[i]->n = 0; } if (w->opt->flag & MEM_F_NO_RESCUE) { kt_for(w->opt->n_threads, worker_sam, w, w->n_reads >> 1); // generate alignment } else { // 1. 计算哪些read需要matesw PROF_START(get_matesw_data); kt_for(w->opt->n_threads, worker_matesw_data, w, w->n_reads >> 1); PROF_END(gprof[G_get_matesw_data], get_matesw_data); // 更新stats PROF_START(update_stats_cache); update_mem_stats(w); // 开辟缓冲区 alloc_update_cache_avx512(w); PROF_END(gprof[G_update_stats_cache], update_stats_cache); // 2. matesw计算过程 PROF_START(gather_matesw_task); gather_matesw_task(w, w->matesw_arr8, &w->msw_tasks8); gather_matesw_task(w, w->matesw_arr16, &w->msw_tasks16); PROF_END(gprof[G_gather_matesw_task], gather_matesw_task); PROF_START(calc_matesw); if (w->msw_tasks8.n > 0) kt_for(w->opt->n_threads, worker_calc_matesw_avx512_u8, w, (w->msw_tasks8.n + SIMD512_WIDTH8 - 1) / SIMD512_WIDTH8); if (w->msw_tasks16.n > 0) kt_for(w->opt->n_threads, worker_calc_matesw16, w, (w->msw_tasks16.n + SIMD512_WIDTH16 - 1) / SIMD512_WIDTH16); PROF_END(gprof[G_calc_matesw], calc_matesw); // 3. kt_for(w->opt->n_threads, workder_gen_sam, w, w->n_reads >> 1); } fprintf(stderr, "zzh %d : 8: %ld 16: %ld\n", i, w->msw_tasks8.n, w->msw_tasks16.n); }