BWA-FastAlign/paired_sam.c

807 lines
32 KiB
C
Raw Permalink Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

#include "paired_sam.h"
#include <math.h>
#include <stdint.h>
#include <stdio.h>
#include "ksw_align_avx.h"
#include "kvec.h"
#include "debug.h"
#include "mate_sw.h"
#include "profiling.h"
// 原版生成sam函数
static void worker_sam(void* data, long i, int tid) {
mem_worker_t* w = (mem_worker_t*)data;
if (bwa_verbose >= 4)
printf("=====> Finalizing read pair '%s' <=====\n", w->seqs[i << 1 | 0].name);
mem_sam_pe(w->opt, w->bns, w->pac, w->pes, (w->n_processed >> 1) + i, &w->seqs[i << 1], &w->regs[i << 1], &w->sams[i << 1], tid);
free(w->regs[i << 1 | 0].a);
free(w->regs[i << 1 | 1].a);
}
// 应该是推测read的方向正链还是反向互补等等
static inline int mem_infer_dir(int64_t l_pac, int64_t b1, int64_t b2, int64_t* dist) {
int64_t p2;
int r1 = (b1 >= l_pac), r2 = (b2 >= l_pac);
p2 = r1 == r2 ? b2 : (l_pac << 1) - 1 - b2; // p2 is the coordinate of read 2 on the read 1 strand
*dist = p2 > b1 ? p2 - b1 : b1 - p2;
return (r1 == r2 ? 0 : 1) ^ (p2 > b1 ? 0 : 3);
}
// 根据ref的begin和end计算对应的rid和修正之后的end
static int get_rid_range(const bntseq_t* bns, const uint8_t* pac, int64_t* beg, int64_t mid, int64_t* end) {
int64_t far_beg, far_end;
int is_rev;
int rid;
if (*end < *beg)
*end ^= *beg, *beg ^= *end, *end ^= *beg; // if end is smaller, swap
assert(*beg <= mid && mid < *end);
rid = bns_pos2rid(bns, bns_depos(bns, mid, &is_rev));
far_beg = bns->anns[rid].offset;
far_end = far_beg + bns->anns[rid].len;
if (is_rev) { // flip to the reverse strand
int64_t tmp = far_beg;
far_beg = (bns->l_pac << 1) - far_end;
far_end = (bns->l_pac << 1) - tmp;
}
*beg = *beg > far_beg ? *beg : far_beg;
*end = *end < far_end ? *end : far_end;
return rid;
}
// 获取ref序列
static inline uint8_t* get_ref_seq(const bntseq_t* bns, const uint8_t* pac, int64_t beg, int64_t end, byte_v *buf) {
bns_get_seq_no_alloc(bns->l_pac, pac, beg, end, &buf->n, &buf->m, &buf->a);
return buf->a;
}
// 将query序列放入缓存
static inline void get_query_seq(int is_rev, int l_seq, uint8_t *seq, byte_v *query) {
int i = 0;
if (query->m < l_seq) {
kv_resize(uint8_t, *query, l_seq);
}
query->n = l_seq;
if (is_rev) {
for (i = 0; i < l_seq; ++i) query->a[l_seq - 1 - i] = seq[i] < 4 ? 3 - seq[i] : 4;
} else {
for (i = 0; i < l_seq; ++i) query->a[i] = seq[i];
}
}
// 翻转seq
static inline void revseq(int l, uint8_t* s) {
int i, t;
for (i = 0; i < l >> 1; ++i) t = s[i], s[i] = s[l - 1 - i], s[l - 1 - i] = t;
}
// 计算当前seq是否需要做matesw需要的话保存必要的数据
static void get_matesw_tasks(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_pestat_t pes[4], const mem_alnreg_t* a, int a_j,
bseq1_t* seq, mem_alnreg_v* ma, int64_t sid, msw_stats_t* stats, msw_task_v* msw_tasks, int64_t n_processed, int tid) {
int64_t l_pac = bns->l_pac;
int l_ms = seq->l_seq;
int i, r, skip[4], rid;
stats->max_seq_len = max_(stats->max_seq_len, l_ms);
for (r = 0; r < 4; ++r) skip[r] = pes[r].failed ? 1 : 0;
for (i = 0; i < ma->n; ++i) { // check which orinentation has been found
int64_t dist;
r = mem_infer_dir(l_pac, a->rb, ma->a[i].rb, &dist);
if (dist >= pes[r].low && dist <= pes[r].high)
skip[r] = 1;
}
if (skip[0] + skip[1] + skip[2] + skip[3] == 4)
return; // consistent pair exist; no need to perform SW
int task_order = 0;
for (r = 0; r < 4; ++r) {
uint8_t is_rev, is_larger;
int64_t rb, re; // 左闭右开
if (skip[r])
continue;
is_rev = (r >> 1 != (r & 1)); // whether to reverse complement the mate
is_larger = !(r >> 1); // whether the mate has larger coordinate
if (!is_rev) {
rb = is_larger ? a->rb + pes[r].low : a->rb - pes[r].high;
// if on the same strand, end position should be larger to make room for the seq length
re = (is_larger ? a->rb + pes[r].high : a->rb - pes[r].low) + l_ms;
} else {
rb = (is_larger ? a->rb + pes[r].low : a->rb - pes[r].high) - l_ms; // similarly on opposite strands
re = is_larger ? a->rb + pes[r].high : a->rb - pes[r].low;
}
if (rb < 0)
rb = 0;
if (re > l_pac << 1)
re = l_pac << 1;
rid = get_rid_range(bns, pac, &rb, (rb + re) >> 1, &re); // 计算ref对应的染色体id和区间起始终止位置
if (a->rid == rid && re - rb >= opt->min_seed_len) { // no funny things happening
tdat[TD_MSW_CNT][tid] += 1;
stats->max_ref_len = max_(stats->max_ref_len, re - rb);
// fprintf(stderr, "zzh here\n");
int xtra = KSW_XSUBO | KSW_XSTART | (l_ms * opt->a < 250 ? KSW_XBYTE : 0) | (opt->min_seed_len * opt->a);
msw_task_t* p;
int msw_idx = 0; // u8
if (!(xtra & KSW_XBYTE))
msw_idx = 1; // i16
msw_task_v* task_arr = &msw_tasks[msw_idx];
p = kv_pushp(msw_task_t, *task_arr);
p->is_rev = is_rev;
p->skip = skip[0] | skip[1] << 1 | skip[2] << 2 | skip[3] << 3;
p->xtra = xtra;
p->rb = rb;
p->re = re;
p->seq_id = sid;
p->aj = a_j;
p->to = task_order++; // 有啥用? 应该是用来合并u8和i16的时候排序用
msw_seq_task_t seq_task = {tid, msw_idx, task_arr->n - 1};
kv_push(msw_seq_task_t, seq->msw_task, seq_task);
// 将matesw任务和对应的seq关联起来这里放指针是不行的因为指针位置会变要保存offset才行
}
}
}
// 先计算哪些read需要做matesw
static void worker_matesw_tasks(void* data, long idx, int tid) {
mem_worker_t* w = (mem_worker_t*)data;
const mem_opt_t* opt = w->opt;
int startIdx = idx * opt->batch_size;
int endIdx = (idx + 1) * opt->batch_size;
if (endIdx > w->n_reads)
endIdx = w->n_reads;
int id = 0;
mem_alnreg_v b[2];
int_v aj[2];
kv_init(aj[0]);
kv_init(aj[1]);
kv_init(b[0]);
kv_init(b[1]);
for (id = startIdx; id < endIdx; id += 2) {
int64_t i_s = id;
bseq1_t* s = &w->seqs[i_s];
mem_alnreg_v* a = &w->regs[i_s];
//s->msw.n = 0; // 清空之前的matesw数据
int i, j;
_clear_vec(b[0]);
_clear_vec(b[1]);
_clear_vec(aj[0]);
_clear_vec(aj[1]);
// fprintf(stderr, "zzh test\n");
for (i = 0; i < 2; ++i)
for (j = 0; j < a[i].n; ++j)
if (a[i].a[j].score >= a[i].a[0].score - opt->pen_unpaired) {
kv_push(mem_alnreg_t, b[i], a[i].a[j]);
kv_push(int, aj[i], j);
}
for (i = 0; i < 2; ++i)
for (j = 0; j < b[i].n && j < opt->max_matesw; ++j)
get_matesw_tasks(opt, w->bns, w->pac, w->pes, &b[i].a[j], aj[i].a[j], &s[!i], &a[!i], i_s + !i, &w->msw->t_msw_stats[tid],
w->msw->t_msw_tasks[tid], w->n_processed, tid);
}
free(b[0].a);
free(b[1].a);
free(aj[0].a);
free(aj[1].a);
}
static void msw_avx512_u8(const mem_opt_t* opt, int num_tasks, msw_buf_t* msw_buf, msw_ref_v* refs, msw_seq_v* seqs, int_v* xtras,
msw_kswr_v* kswrs, int phase, int tid) {
int i = 0, j = 0, k = 0;
int refLen[SIMD512_WIDTH8] = {0};
// 准备数据并进行计算
for (i = 0; i < num_tasks; i += SIMD512_WIDTH8) {
// 将ref和seq赋值给对应的用来计算的缓存
uint8_t* mySeq1SoA = msw_buf->refArr;
uint8_t* mySeq2SoA = msw_buf->seqArr;
int maxSeqLen = 0, maxRefLen = 0;
// for test
#if 0
for (j = 0; j < SIMD512_WIDTH8; ++j) {
fprintf(stderr, "r: %d, s: %d\n", refs->a[i + j].l_ref, seqs->a[i + j].l_seq);
fflush(stderr);
}
#endif
PROF_START(msw_pack_seq);
// 处理ref
for (j = 0; j < SIMD512_WIDTH8; ++j) {
msw_ref_t* ref = &refs->a[i + j];
uint8_t* seq1 = ref->ref;
refLen[j] = ref->l_ref;
// 处理ref
for (k = 0; k < ref->l_ref; ++k) {
mySeq1SoA[k * SIMD512_WIDTH8 + j] = (seq1[k] == AMBIG_ ? AMBR : seq1[k]);
}
maxRefLen = max_(maxRefLen, ref->l_ref);
}
for (j = 0; j < SIMD512_WIDTH8; ++j) {
msw_ref_t* ref = &refs->a[i + j];
for (k = ref->l_ref; k <= maxRefLen; ++k) {
mySeq1SoA[k * SIMD512_WIDTH8 + j] = 0xFF;
}
}
// 处理query
for (j = 0; j < SIMD512_WIDTH8; ++j) {
msw_seq_t* seq = &seqs->a[i + j];
uint8_t* seq2 = seq->seq;
int quanta = ((seq->l_seq + 16 - 1) / 16) * 16; // based on SSE-8 bit lane
for (k = 0; k < seq->l_seq; k++) {
mySeq2SoA[k * SIMD512_WIDTH8 + j] = (seq2[k] == AMBIG_ ? AMBQ : seq2[k]);
}
for (; k < quanta; ++k) {
mySeq2SoA[k * SIMD512_WIDTH8 + j] = DUMMY5; // SSE quanta
}
maxSeqLen = max_(maxSeqLen, quanta);
}
for (j = 0; j < SIMD512_WIDTH8; ++j) {
msw_seq_t* seq = &seqs->a[i + j];
int quanta = ((seq->l_seq + 16 - 1) / 16) * 16; // based on SSE-8 bit lane
for (k = quanta; k <= maxSeqLen; k++) {
mySeq2SoA[k * SIMD512_WIDTH8 + j] = 0xFF;
}
}
PROF_END(tprof[T_MSW_PACK_SEQ][tid], msw_pack_seq);
if (phase == 0)
tdat[TD_ALIGN_1_CNT][tid] += 1;
else
tdat[TD_ALIGN_2_CNT][tid] += 1;
// 利用smid指令计算
ksw_align_avx512_u8(opt->a, -1 * opt->b, opt->o_ins, opt->e_ins, opt->o_del, opt->e_del, msw_buf, mySeq1SoA, mySeq2SoA, maxRefLen, maxSeqLen,
&xtras->a[i], refLen, &kswrs->a[i], phase);
}
}
// 再多线程计算matesw利用inter-query的simd并行
static void worker_calc_matesw_avx512_u8(void* data, long idx, int tid) {
mem_worker_t* w = (mem_worker_t*)data;
const mem_opt_t* opt = w->opt;
int startIdx = idx * w->opt->msw_batch_size;
int endIdx = (idx + 1) * w->opt->msw_batch_size;
if (endIdx > w->msw->p_msw_tasks_u8.n)
endIdx = w->msw->p_msw_tasks_u8.n;
int roundEndIdx = ((endIdx + SIMD512_WIDTH8 - 1) / SIMD512_WIDTH8) * SIMD512_WIDTH8;
int i = 0, j = 0, k = 0;
msw_buf_t* msw_buf = &w->msw->t_msw_buf[tid]; // 缓冲区
byte_vv* ref_bufs = &w->msw->t_msw_ref_buf[tid];
byte_vv* seq_bufs = &w->msw->t_msw_seq_buf[tid];
msw_ref_v* refs = &w->msw->t_msw_1_refs[tid];
msw_seq_v* seqs = &w->msw->t_msw_1_seqs[tid];
int_v* xtras = &w->msw->t_msw_1_xtras[tid];
msw_kswr_v* kswrs = &w->msw->t_msw_1_kswrs[tid];
// 获取ref和seq数据
PROF_START(msw_get_ref);
for (i = startIdx, j = 0; i < endIdx; ++i, ++j) {
msw_task_t* task = w->msw->p_msw_tasks_u8.a[i];
// 1. 获取对应的ref
byte_v* ref_buf = &ref_bufs->a[j];
get_ref_seq(w->bns, w->pac, task->rb, task->re, ref_buf);
refs->a[j].l_ref = ref_buf->n;
refs->a[j].ref = ref_buf->a;
// 2. 获取对应的seq
byte_v* seq_buf = &seq_bufs->a[j];
get_query_seq(task->is_rev, w->seqs[task->seq_id].l_seq, (uint8_t*)w->seqs[task->seq_id].seq, seq_buf);
seqs->a[j].l_seq = seq_buf->n;
seqs->a[j].seq = seq_buf->a;
// 3. 其他数据
xtras->a[j] = task->xtra;
kswrs->a[j].tb = kswrs->a[j].qb = -1;
}
for (; i < roundEndIdx; ++i, ++j) {
refs->a[j].l_ref = 0;
seqs->a[j].l_seq = 0;
xtras->a[j] = 0;
}
PROF_END(tprof[T_MSW_GET_REF][tid], msw_get_ref);
PROF_START(msw_1);
msw_avx512_u8(opt, endIdx - startIdx, msw_buf, refs, seqs, xtras, kswrs, 0, tid); // 第一阶段
PROF_END(tprof[T_MSW_1][tid], msw_1);
// 第二阶段计算
msw_ref_v* refs2 = &w->msw->t_msw_2_refs[tid];
msw_seq_v* seqs2 = &w->msw->t_msw_2_seqs[tid];
int_v* xtras2 = &w->msw->t_msw_2_xtras[tid];
msw_kswr_v* kswrs2 = &w->msw->t_msw_2_kswrs[tid];
int_v msw_task_ids = {0}; // 用于第二阶段任务统计
// 保存结果并筛选第二阶段msw计算的任务
for (i = startIdx, j = 0, k = 0; i < endIdx; ++i, ++j) {
msw_task_t* task = w->msw->p_msw_tasks_u8.a[i];
kswr_avx_t r = kswrs->a[j];
task->aln = r;
// 打印测试
// kswr_avx_t aln = r;
//fprintf(gf[0], "id-%ld score-%d te-%d qe-%d score2-%d te2-%d tb-%d qb-%d\n", task->seq_id + w->n_processed, aln.score, aln.te, aln.qe,
// aln.score2, aln.te2, aln.tb, aln.qb);
if ((task->xtra & KSW_XSTART) == 0 || ((task->xtra & KSW_XSUBO) && r.score < (task->xtra & 0xffff)))
continue;
task->xtra = KSW_XSTOP | r.score;
kv_push(int, msw_task_ids, i);
// 1. 获取对应的ref
refs2->a[k] = refs->a[j];
revseq(r.te + 1, refs2->a[k].ref);
// refs2->a[k].l_ref = r.te + 1; // zzh add, for test
// 2. 获取对应的seq
seqs2->a[k] = seqs->a[j];
revseq(r.qe + 1, seqs2->a[k].seq);
seqs2->a[k].l_seq = r.qe + 1;
// 3. 其他数据
xtras2->a[k] = task->xtra;
kswrs2->a[k] = r;
++k;
}
roundEndIdx = ((msw_task_ids.n + SIMD512_WIDTH8 - 1) / SIMD512_WIDTH8) * SIMD512_WIDTH8;
for (; k < roundEndIdx; ++k) {
refs2->a[k].l_ref = 0;
seqs2->a[k].l_seq = 0;
}
PROF_START(msw_2);
msw_avx512_u8(opt, msw_task_ids.n, msw_buf, refs2, seqs2, xtras2, kswrs2, 1, tid); // 第二阶段
PROF_END(tprof[T_MSW_2][tid], msw_2);
// 结果赋值
for (k = 0; k < msw_task_ids.n; ++k) {
i = msw_task_ids.a[k];
msw_task_t* task = w->msw->p_msw_tasks_u8.a[i];
task->aln = kswrs2->a[k];
// kswr_avx_t aln = task->aln;
//fprintf(gf[1], "id-%ld score-%d te-%d qe-%d score2-%d te2-%d tb-%d qb-%d\n", task->seq_id + w->n_processed, aln.score, aln.te, aln.qe,
// aln.score2, aln.te2, aln.tb, aln.qb);
}
kv_destroy(msw_task_ids);
}
static void worker_calc_matesw_avx512_i16(void* data, long idx, int tid) {}
// 检测添加新的align
static int check_add_align(const mem_opt_t* opt, kswr_avx_t aln, int is_rev, int64_t l_pac, mem_alnreg_t* a, int l_ms, uint8_t* ms,
mem_alnreg_v* ma, int64_t rb) {
int res = 0;
if (aln.score >= opt->min_seed_len && aln.qb >= 0) { // something goes wrong if aln.qb < 0
mem_alnreg_t b;
memset(&b, 0, sizeof(mem_alnreg_t));
b.rid = a->rid;
b.is_alt = a->is_alt;
b.qb = is_rev ? l_ms - (aln.qe + 1) : aln.qb;
b.qe = is_rev ? l_ms - aln.qb : aln.qe + 1;
b.rb = is_rev ? (l_pac << 1) - (rb + aln.te + 1) : rb + aln.tb;
b.re = is_rev ? (l_pac << 1) - (rb + aln.tb) : rb + aln.te + 1;
b.score = aln.score;
b.csub = aln.score2;
b.secondary = -1;
b.seedcov = (b.re - b.rb < b.qe - b.qb ? b.re - b.rb : b.qe - b.qb) >> 1;
// printf("*** %d, [%lld,%lld], %d:%d, (%lld,%lld), (%lld,%lld) == (%lld,%lld)\n", aln.score, rb, re, is_rev,
// is_larger, a->rb, a->re, ma->a[0].rb, ma->a[0].re, b.rb, b.re);
kv_push(mem_alnreg_t, *ma, b); // make room for a new element
#ifdef SAM_EXACT
// move b s.t. ma is sorted
int i, tmp;
for (i = 0; i < ma->n - 1; ++i) // find the insertion point
if (ma->a[i].score < b.score)
break;
tmp = i;
for (i = ma->n - 1; i > tmp; --i) ma->a[i] = ma->a[i - 1];
ma->a[i] = b;
ma->n = mem_sort_dedup_patch(opt, 0, 0, 0, ma->n, ma->a);
#endif
res = 1;
}
return res;
}
#define raw_mapq(diff, a) ((int)(6.02 * (diff) / (a) + .499))
// 根据比对结果生成sam
void generate_sam(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2],
seq_sam_t ss[2], int64_t n_processed, int tid) {
int i, j, z[2], o, subo, n_sub, extra_flag = 1, n_pri[2], n_aa[2];
kstring_t str;
mem_aln_t h[2], g[2], aa[2][2];
// int cmp = strcmp("ERR194147.17699", s[0].name);
str.l = str.m = 0;
str.s = 0;
memset(h, 0, sizeof(mem_aln_t) * 2);
memset(g, 0, sizeof(mem_aln_t) * 2);
n_aa[0] = n_aa[1] = 0;
n_pri[0] = mem_mark_primary_se(opt, a[0].n, a[0].a, id << 1 | 0);
n_pri[1] = mem_mark_primary_se(opt, a[1].n, a[1].a, id << 1 | 1);
if (opt->flag & MEM_F_PRIMARY5) {
mem_reorder_primary5(opt->T, &a[0]);
mem_reorder_primary5(opt->T, &a[1]);
}
if (opt->flag & MEM_F_NOPAIRING)
goto no_pairing;
// pairing single-end hits
if (n_pri[0] && n_pri[1] && (o = mem_pair(opt, bns, pac, pes, s, a, id, &subo, &n_sub, z, n_pri)) > 0) {
int is_multi[2], q_pe, score_un, q_se[2];
char** XA[2];
// check if an end has multiple hits even after mate-SW
for (i = 0; i < 2; ++i) {
for (j = 1; j < n_pri[i]; ++j)
if (a[i].a[j].secondary < 0 && a[i].a[j].score >= opt->T)
break;
is_multi[i] = j < n_pri[i] ? 1 : 0;
}
if (is_multi[0] || is_multi[1])
goto no_pairing; // TODO: in rare cases, the true hit may be long but with low score
// compute mapQ for the best SE hit
score_un = a[0].a[0].score + a[1].a[0].score - opt->pen_unpaired;
// q_pe = o && subo < o? (int)(MEM_MAPQ_COEF * (1. - (double)subo / o) * log(a[0].a[z[0]].seedcov + a[1].a[z[1]].seedcov) + .499) : 0;
subo = subo > score_un ? subo : score_un;
q_pe = raw_mapq(o - subo, opt->a);
if (n_sub > 0)
q_pe -= (int)(4.343 * log(n_sub + 1) + .499);
if (q_pe < 0)
q_pe = 0;
if (q_pe > 60)
q_pe = 60;
q_pe = (int)(q_pe * (1. - .5 * (a[0].a[0].frac_rep + a[1].a[0].frac_rep)) + .499);
// the following assumes no split hits
if (o > score_un) { // paired alignment is preferred
mem_alnreg_t* c[2];
c[0] = &a[0].a[z[0]];
c[1] = &a[1].a[z[1]];
for (i = 0; i < 2; ++i) {
if (c[i]->secondary >= 0)
c[i]->sub = a[i].a[c[i]->secondary].score, c[i]->secondary = -2;
q_se[i] = mem_approx_mapq_se(opt, c[i]);
}
q_se[0] = q_se[0] > q_pe ? q_se[0] : q_pe < q_se[0] + 40 ? q_pe : q_se[0] + 40;
q_se[1] = q_se[1] > q_pe ? q_se[1] : q_pe < q_se[1] + 40 ? q_pe : q_se[1] + 40;
extra_flag |= 2;
// cap at the tandem repeat score
q_se[0] = q_se[0] < raw_mapq(c[0]->score - c[0]->csub, opt->a) ? q_se[0] : raw_mapq(c[0]->score - c[0]->csub, opt->a);
q_se[1] = q_se[1] < raw_mapq(c[1]->score - c[1]->csub, opt->a) ? q_se[1] : raw_mapq(c[1]->score - c[1]->csub, opt->a);
} else { // the unpaired alignment is preferred
z[0] = z[1] = 0;
q_se[0] = mem_approx_mapq_se(opt, &a[0].a[0]);
q_se[1] = mem_approx_mapq_se(opt, &a[1].a[0]);
}
for (i = 0; i < 2; ++i) {
int k = a[i].a[z[i]].secondary_all;
if (k >= 0 && k < n_pri[i]) { // switch secondary and primary if both of them are non-ALT
assert(a[i].a[k].secondary_all < 0);
for (j = 0; j < a[i].n; ++j)
if (a[i].a[j].secondary_all == k || j == k)
a[i].a[j].secondary_all = z[i];
a[i].a[z[i]].secondary_all = -1;
}
}
if (!(opt->flag & MEM_F_ALL)) {
for (i = 0; i < 2; ++i) XA[i] = mem_gen_alt(opt, bns, pac, &a[i], s[i].l_seq, s[i].seq);
} else
XA[0] = XA[1] = 0;
// write SAM
for (i = 0; i < 2; ++i) {
h[i] = mem_reg2aln(opt, bns, pac, s[i].l_seq, s[i].seq, &a[i].a[z[i]]);
h[i].mapq = q_se[i];
h[i].flag |= 0x40 << i | extra_flag;
h[i].XA = XA[i] ? XA[i][z[i]] : 0;
aa[i][n_aa[i]++] = h[i];
if (n_pri[i] < a[i].n) { // the read has ALT hits
mem_alnreg_t* p = &a[i].a[n_pri[i]];
if (p->score < opt->T || p->secondary >= 0 || !p->is_alt)
continue;
g[i] = mem_reg2aln(opt, bns, pac, s[i].l_seq, s[i].seq, p);
g[i].flag |= 0x800 | 0x40 << i | extra_flag;
g[i].XA = XA[i] ? XA[i][n_pri[i]] : 0;
aa[i][n_aa[i]++] = g[i];
}
}
ss[0].sam.l = 0;
// PROF_START(aln2sam);
for (i = 0; i < n_aa[0]; ++i) mem_aln2sam(opt, bns, &ss[0].sam, &s[0], n_aa[0], aa[0], i, &h[1]); // write read1 hits
ss[1].sam.l = 0;
for (i = 0; i < n_aa[1]; ++i) mem_aln2sam(opt, bns, &ss[1].sam, &s[1], n_aa[1], aa[1], i, &h[0]); // write read2 hits
if (strcmp(s[0].name, s[1].name) != 0)
err_fatal(__func__, "paired reads have different names: \"%s\", \"%s\"\n", s[0].name, s[1].name);
// free
for (i = 0; i < 2; ++i) {
free(h[i].cigar);
free(g[i].cigar);
if (XA[i] == 0)
continue;
for (j = 0; j < a[i].n; ++j) free(XA[i][j]);
free(XA[i]);
}
} else
goto no_pairing;
goto end_clear;
no_pairing:
for (i = 0; i < 2; ++i) {
int which = -1;
if (a[i].n) {
if (a[i].a[0].score >= opt->T)
which = 0;
else if (n_pri[i] < a[i].n && a[i].a[n_pri[i]].score >= opt->T)
which = n_pri[i];
}
if (which >= 0)
h[i] = mem_reg2aln(opt, bns, pac, s[i].l_seq, s[i].seq, &a[i].a[which]);
else
h[i] = mem_reg2aln(opt, bns, pac, s[i].l_seq, s[i].seq, 0);
}
if (!(opt->flag & MEM_F_NOPAIRING) && h[0].rid == h[1].rid &&
h[0].rid >= 0) { // if the top hits from the two ends constitute a proper pair, flag it.
int64_t dist;
int d;
d = mem_infer_dir(bns->l_pac, a[0].a[0].rb, a[1].a[0].rb, &dist);
if (!pes[d].failed && dist >= pes[d].low && dist <= pes[d].high)
extra_flag |= 2;
}
mem_reg2sam(opt, bns, pac, &s[0], &a[0], 0x41 | extra_flag, &h[1], &ss[0]);
mem_reg2sam(opt, bns, pac, &s[1], &a[1], 0x81 | extra_flag, &h[0], &ss[1]);
if (strcmp(s[0].name, s[1].name) != 0)
err_fatal(__func__, "paired reads have different names: \"%s\", \"%s\"\n", s[0].name, s[1].name);
free(h[0].cigar);
free(h[1].cigar);
end_clear:
// 清理seq相关的内存空间
_destory_clear_vec(s[0].msw_task);
_destory_clear_vec(s[1].msw_task);
free(a[0].a);
free(a[1].a);
}
// 最后再计算并生成sam数据
static void workder_gen_sam(void* data, long idx, int tid) {
mem_worker_t* w = (mem_worker_t*)data;
const mem_opt_t* opt = w->opt;
const bntseq_t* bns = w->bns;
const uint8_t* pac = w->pac;
const mem_pestat_t *pes = w->pes;
int startIdx = idx * opt->batch_size;
int endIdx = (idx + 1) * opt->batch_size;
if (endIdx > w->n_reads)
endIdx = w->n_reads;
int id = 0, i = 0, j = 0, k = 0;
#ifdef SAM_EXACT
mem_alnreg_v bb[2];
kv_init(bb[0]); kv_init(bb[1]);
#endif
for (id = startIdx; id < endIdx; id += 2) {
// 初始化变量
bseq1_t* s = &w->seqs[id];
mem_alnreg_v* a = &w->regs[id];
seq_sam_t* ss = &w->sams[id];
#if 0
int cmp = strcmp("ERR194147.1629456", s[0].name) == 0;
// int cmp = (id + w->n_processed) == 202924;
if (cmp == 1) {
fprintf(stderr, "%s\n", s[0].name);
fflush(stderr);
//exit(0);
}
#endif
int n[2] = {0};
#ifndef SAM_EXACT
int nn[2] = {0};
#endif
#ifdef SAM_EXACT
_clear_vec(bb[0]);
_clear_vec(bb[1]);
for (i = 0; i < 2; ++i) {
for (k = 0; k < s[!i].msw_task.n; ++k) {
msw_seq_task_t st = s[!i].msw_task.a[k];
msw_task_t* t = &w->msw->t_msw_tasks[st.thread_idx][st.arr_idx].a[st.task_idx];
kv_push(mem_alnreg_t, bb[i], a[i].a[t->aj]);
}
}
#endif
for (i = 0; i < 2; ++i) {
int si = !i;
#ifndef SAM_EXACT
int origin_n = a[si].n;
#endif
// 这里应该先给task排序因为u8和i16是分开排序的需要合在一起不用了添加task的时候已经排序过了
for (k = 0; k < s[si].msw_task.n; ++k) {
msw_seq_task_t st = s[si].msw_task.a[k];
msw_task_t* t = &w->msw->t_msw_tasks[st.thread_idx][st.arr_idx].a[st.task_idx];
#ifdef SAM_EXACT
mem_alnreg_t* b = &bb[i].a[k];
#else
mem_alnreg_t* b = &a[i].a[t->aj];
#endif
mem_alnreg_v* ma = &a[si];
uint8_t skip = 0;
if (n[si]) { // 添加了新的aln需要检测skip
#ifdef SAM_EXACT
for (j = 0; j < ma->n; ++j) { // check which orinentation has been found
#else
for (j = origin_n; j < ma->n; ++j) {
#endif
int64_t dist;
int r;
r = mem_infer_dir(bns->l_pac, b->rb, ma->a[j].rb, &dist);
if (dist >= pes[r].low && dist <= pes[r].high)
skip |= 1 << r;
}
}
// 检查新添加的aln是不是把当前的任务skip掉了
if ((t->skip | skip) == 15)
continue;
#if 0
kswr_avx_t aln = t->aln; // fprintf(gf[0], "id-%ld score-%d te-%d qe-%d score2-%d te2-%d tb-%d qb-%d\n", s[si].id +
// w->n_processed, aln.score, aln.te, aln.qe, aln.score2, aln.te2, aln.tb, aln.qb);
if (cmp) {
fprintf(stderr, "id-%ld score-%d te-%d qe-%d score2-%d te2-%d tb-%d qb-%d\n", s[si].id + w->n_processed, aln.score, aln.te,
aln.qe, aln.score2, aln.te2, aln.tb, aln.qb); fflush(stderr);
}
#endif
#ifndef SAM_EXACT
nn[si] += 1;
#endif
n[si] += check_add_align(opt, t->aln, t->is_rev, bns->l_pac, b, s[si].l_seq, (uint8_t*)s[si].seq, ma, t->rb);
}
}
// 处理完2个pair read之后再排序因为上面操作有插入排序后之前记录的索引就无效了所以上面不能排序要两个pair read处理完之后再排序
#ifndef SAM_EXACT
for (i = 0; i < 2; ++i) {
if (nn[i] > 0) {
a[i].n = mem_sort_dedup_patch(opt, 0, 0, 0, a[i].n, a[i].a);
}
}
#endif
////////////////////////////////////////////////////////////
// 这里需要传入全局id
generate_sam(opt, bns, pac, pes, (w->n_processed + id) >> 1, s, a, ss, w->n_processed, tid);
}
#ifdef SAM_EXACT
_destory_clear_vec(bb[0]);
_destory_clear_vec(bb[1]);
#endif
}
// 划分matesw任务
static void gather_matesw_task(mem_worker_t* w, msw_task_v** thread_tasks) {
//_clear_vec(w->msw->p_msw_tasks_u8);
//_clear_vec(w->msw->p_msw_tasks_i16);
_destory_clear_vec(w->msw->p_msw_tasks_u8);
_destory_clear_vec(w->msw->p_msw_tasks_i16);
int i = 0, j = 0;
for (i = 0; i < w->opt->n_threads; ++i) {
for (j = 0; j < thread_tasks[i][0].n; ++j) {
msw_task_t* tp = &thread_tasks[i][0].a[j];
kv_push(msw_task_t*, w->msw->p_msw_tasks_u8, tp);
}
for (j = 0; j < thread_tasks[i][1].n; ++j) {
msw_task_t* tp = &thread_tasks[i][1].a[j];
kv_push(msw_task_t*, w->msw->p_msw_tasks_i16, tp);
}
}
}
// 更新stats
static void update_msw_stats(mem_worker_t* w) {
int i = 0, max_seq_len = 0, max_ref_len = 0;
for (i = 0; i < w->opt->n_threads; ++i) {
max_ref_len = max_(max_ref_len, w->msw->t_msw_stats[i].max_ref_len);
max_seq_len = max_(max_seq_len, w->msw->t_msw_stats[i].max_seq_len);
}
int quanta = ((max_seq_len + 16 - 1) / 16) * 16; // based on SSE-8 bit lane
max_seq_len = quanta + 1; // 这里需要加一因为ksw_avx512里赋值的时候是 <= ncol
w->msw->p_msw_stats.max_ref_len = max_ref_len + 1;
w->msw->p_msw_stats.max_seq_len = max_seq_len;
}
// 开辟缓冲区
static void alloc_update_cache_avx512(mem_worker_t* w) {
int i = 0;
for (i = 0; i < w->opt->n_threads; ++i) {
msw_buf_t* b = &w->msw->t_msw_buf[i];
// 更新跟ref len有关的缓冲区
if (b->ref_len < w->msw->p_msw_stats.max_ref_len) {
b->ref_len = w->msw->p_msw_stats.max_ref_len;
if (b->refArr) _mm_free(b->refArr);
if (b->rowMax) _mm_free(b->rowMax);
b->refArr = (uint8_t*)_mm_malloc(b->ref_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64);
b->rowMax = (uint8_t*)_mm_malloc(b->ref_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64);
}
// 更新跟seq len有关的缓冲区
if (b->seq_len < w->msw->p_msw_stats.max_seq_len) {
b->seq_len = w->msw->p_msw_stats.max_seq_len;
if (b->seqArr) _mm_free(b->seqArr);
if (b->H0) _mm_free(b->H0);
if (b->H1) _mm_free(b->H1);
if (b->Hmax) _mm_free(b->Hmax);
if (b->F) _mm_free(b->F);
b->seqArr = (uint8_t*)_mm_malloc(b->seq_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64);
b->H0 = (uint8_t*)_mm_malloc(b->seq_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64);
b->H1 = (uint8_t*)_mm_malloc(b->seq_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64);
b->Hmax = (uint8_t*)_mm_malloc(b->seq_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64);
b->F = (uint8_t*)_mm_malloc(b->seq_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64);
}
}
}
// 针对pair end数据生成sam的过程
void gen_paired_sam(mem_worker_t* w) {
if (w->opt->flag & MEM_F_NO_RESCUE) {
kt_for(w->opt->n_threads, worker_sam, w, w->n_reads >> 1); // generate alignment
} else {
// 清空一下任务数组,不能放到线程里去做
int i = 0;
for (i = 0; i < w->opt->n_threads; ++i) {
//_clear_vec(w->msw->t_msw_tasks[i][0]);
//_clear_vec(w->msw->t_msw_tasks[i][1]);
_destory_clear_vec(w->msw->t_msw_tasks[i][0]);
_destory_clear_vec(w->msw->t_msw_tasks[i][1]);
}
int batch_n = (w->n_reads + w->opt->batch_size - 1) / w->opt->batch_size;
// 1. 计算哪些read需要matesw
PROF_START(get_matesw_data);
kt_for(w->opt->n_threads, worker_matesw_tasks, w, batch_n);
PROF_END(gprof[G_get_matesw_data], get_matesw_data);
// 更新stats
PROF_START(update_stats_cache);
update_msw_stats(w);
// 开辟缓冲区
alloc_update_cache_avx512(w);
PROF_END(gprof[G_update_stats_cache], update_stats_cache);
// 2. 收集每个线程中的msw任务
PROF_START(gather_matesw_task);
gather_matesw_task(w, w->msw->t_msw_tasks);
PROF_END(gprof[G_gather_matesw_task], gather_matesw_task);
PROF_START(calc_matesw);
int msw_batch_n = 0;
// 3. 处理msw任务
if (w->msw->p_msw_tasks_u8.n > 0) {
msw_batch_n = (w->msw->p_msw_tasks_u8.n + w->opt->msw_batch_size - 1) / w->opt->msw_batch_size;
kt_for(w->opt->n_threads, worker_calc_matesw_avx512_u8, w, msw_batch_n);
}
if (w->msw->p_msw_tasks_i16.n > 0) {
msw_batch_n = (w->msw->p_msw_tasks_i16.n + w->opt->msw_batch_size - 1) / w->opt->msw_batch_size;
kt_for(w->opt->n_threads, worker_calc_matesw_avx512_i16, w, msw_batch_n);
}
PROF_END(gprof[G_calc_matesw], calc_matesw);
// 4. 生成sam
PROF_START(gen_sam);
kt_for(w->opt->n_threads, workder_gen_sam, w, batch_n);
PROF_END(gprof[G_gen_sam], gen_sam);
}
fprintf(stderr, "zzh : u8: %ld i16: %ld\n", w->msw->p_msw_tasks_u8.n, w->msw->p_msw_tasks_i16.n);
}