完成了matesw的代码,但是还有bug,结果不一致

This commit is contained in:
Zhang Zhonghai 2026-01-13 23:37:06 +08:00
parent 64b6f1fab2
commit 4c5e1e2fa1
14 changed files with 631 additions and 285 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
*.log
*.sam
# Prerequisites
*.d

4
.vscode/launch.json vendored
View File

@ -13,7 +13,7 @@
"args": [
"mem",
"-t",
"1",
"32",
"-M",
"-R",
"'@RG\\tID:normal\\tSM:normal\\tPL:illumina\\tLB:normal\\tPG:bwa'",
@ -21,7 +21,7 @@
"~/data/dataset/D1/n1.fq.gz",
"~/data/dataset/D1/n2.fq.gz",
"-o",
"/dev/null",
"D1-out.sam",
],
"cwd": "${workspaceFolder}", //
},

View File

@ -1,5 +1,5 @@
CC= gcc
CFLAGS= -g -Wall -Wno-unused-function -mavx2 -mavx512bw -O2
CFLAGS= -g -Wall -Wno-unused-function -mavx2 -mavx512bw -O3
WRAP_MALLOC=-DUSE_MALLOC_WRAPPERS
SHOW_PERF= -DSHOW_PERF
@ -16,7 +16,9 @@ AOBJS= bwashm.o bwase.o bwaseqio.o bwtgap.o bwtaln.o bamlite.o \
bwtsw2_core.o bwtsw2_main.o bwtsw2_aux.o bwt_lite.o \
bwtsw2_chain.o fastmap.o bwtsw2_pair.o profiling.o \
fmt_idx.o ksw_extend2_avx2.o ksw_extend2_avx2_u8.o \
debug.o paired_sam.o ksw_align_avx2.o ksw_align_avx512.o
debug.o paired_sam.o ksw_align_avx2.o ksw_align_avx512.o \
mate_sw.o
PROG= fastalign
INCLUDES=
LIBS= -lm -lz -lpthread -ldl

9
bwa.h
View File

@ -35,6 +35,7 @@
#include "kvec.h"
#include "ksw.h"
#include "ksw_align_avx.h"
#include "mate_sw.h"
#define BWA_IDX_BWT 0x1
#define BWA_IDX_BNS 0x2
@ -62,7 +63,7 @@ typedef struct {
uint8_t *mem;
} bwaidx_t;
#if 0
// 需要做mate sw的read
typedef struct {
int is_rev; // seq是否在反向互补链上
@ -78,13 +79,15 @@ typedef struct {
} matesw_data_v;
typedef kvec_t(matesw_data_t*) matesw_ptr_v;
#endif
typedef struct {
int l_seq, id;
int m_name, m_comment, m_seq, m_qual;
char *name, *comment, *seq, *qual;
matesw_ptr_v msw;
kstring_t sam;
msw_task_ptr_v msw;
//int_v msw;
// kstring_t sam;
} bseq1_t;
typedef struct {

View File

@ -1072,7 +1072,7 @@ void mem_reorder_primary5(int T, mem_alnreg_v *a)
// TODO (future plan): group hits into a uint64_t[] array. This will be cleaner and more flexible
void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a, int extra_flag, const mem_aln_t *m, seq_sam_t *ss)
{
extern char **mem_gen_alt(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, mem_alnreg_v *a, int l_query, const char *query);
extern char **mem_gen_alt(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_alnreg_v *a, int l_query, const char *query);
// kstring_t str;
kvec_t(mem_aln_t) aa;
int k, l;
@ -1290,6 +1290,9 @@ mem_worker_t *init_mem_worker(const mem_opt_t *opt, const FMTIndex *fmt, const b
w->smem_arr = malloc(i * sizeof(smem_v *));
w->chain_arr = malloc(i * sizeof(mem_chain_v *));
w->isize_arr = malloc(i * sizeof(uint64_v *));
w->msw = calloc(1, sizeof(msw_data_t));
init_msw_data(w->msw, opt->n_threads, opt->msw_batch_size);
#if 0
w->matesw_arr8 = malloc(i * sizeof(matesw_data_v*));
w->matesw_arr16 = malloc(i * sizeof(matesw_data_v*));
w->msw_buf = calloc(i, sizeof(matesw_buf_t));
@ -1304,15 +1307,17 @@ mem_worker_t *init_mem_worker(const mem_opt_t *opt, const FMTIndex *fmt, const b
w->msw_2_seq = calloc(i, sizeof(msw_seq_v));
w->msw_2_kswr = calloc(i, sizeof(msw_kswr_v));
w->msw_2_xtra = calloc(i, sizeof(int_v));
#endif
for (i = 0; i < opt->n_threads; ++i) {
w->aux[i] = smem_aux_init();
w->smem_arr[i] = malloc(opt->batch_size * sizeof(smem_v));
w->chain_arr[i] = malloc(opt->batch_size * sizeof(mem_chain_v));
w->isize_arr[i] = calloc(4, sizeof(uint64_v));
#if 0
w->matesw_arr8[i] = calloc(1, sizeof(matesw_data_v));
w->matesw_arr16[i] = calloc(1, sizeof(matesw_data_v));
w->msw_2_arr8[i] = calloc(1, sizeof(matesw_data_t*));
#endif
for (j = 0; j < opt->batch_size; ++j) {
kv_init(w->smem_arr[i][j].mem);
kv_init(w->smem_arr[i][j].pos_arr);

View File

@ -27,12 +27,12 @@
#ifndef BWAMEM_H_
#define BWAMEM_H_
#include "bwt.h"
#include "bntseq.h"
#include "bwa.h"
#include "bwt.h"
#include "fmt_idx.h"
#include "utils.h"
#include "ksw_align_avx.h"
#include "utils.h"
#define MEM_MAPQ_COEF 30.0
#define MEM_MAPQ_MAX 60
@ -154,6 +154,7 @@ typedef struct {
uint64_v pos_arr;
} smem_v;
#if 0
typedef struct {
int start_tid;
int start_idx;
@ -168,8 +169,6 @@ typedef struct {
int max_msw_ref_len; // mate sw里的最长的ref长度
} mem_stats_t; // 记录一些用到的数据统计比如最长ref长度最长seq长度等
typedef kvec_t(byte_v) byte_vv;
typedef struct {
uint8_t* seq; // 只是指向seq的指针
int l_seq; // seq长度
@ -186,6 +185,8 @@ typedef kvec_t(msw_ref_t) msw_ref_v;
typedef kvec_t(kswr_avx_t) msw_kswr_v;
#endif
typedef struct {
int calc_isize;
const mem_opt_t *opt;
@ -201,6 +202,10 @@ typedef struct {
mem_chain_v **chain_arr;
mem_alnreg_v *regs;
uint64_v **isize_arr;
msw_data_t *msw;
#if 0
matesw_data_v** matesw_arr8; // 收集需要做mate sw的任务的数据
matesw_ptr_v** msw_2_arr8; // 第二阶段任务,每个线程单独收集
matesw_data_v** matesw_arr16;
@ -222,6 +227,7 @@ typedef struct {
msw_seq_v* msw_2_seq;
msw_kswr_v* msw_2_kswr;
int_v* msw_2_xtra;
#endif
int64_t n_processed;
int64_t n; // regs元素个数可动态扩展

View File

@ -6,6 +6,7 @@
#include "ksw.h"
#define AMBIG_ 4 // ambiguous base
// for 16 bit
#define DUMMY1_ 4
@ -40,7 +41,7 @@ typedef struct {
uint8_t* rowMax;
uint8_t* seqArr;
uint8_t* refArr;
} matesw_buf_t; // inter-query算法的mate sw计算过程需要用到的缓存空间
} msw_buf_t; // inter-query算法的mate sw计算过程需要用到的缓存空间
#ifdef __cplusplus
extern "C" {
@ -56,7 +57,7 @@ void ksw_align_avx512_u8(int8_t w_match, // match分数正数
int8_t e_ins, // 延续一个insert罚分正数
int8_t o_del, // 开始一个delete罚分正数
int8_t e_del, // 延续一个delete罚分正数
matesw_buf_t* cache, // 计算用到的一些数据
msw_buf_t* cache, // 计算用到的一些数据
uint8_t* seq1SoA, // ref序列已经pack好了
uint8_t* seq2SoA, // seq序列
int16_t nrow, // 最长的行数对应ref长度

View File

@ -62,7 +62,7 @@ void ksw_align_avx512_u8(int8_t w_match, // match分数正数
int8_t e_ins, // 延续一个insert罚分正数
int8_t o_del, // 开始一个delete罚分正数
int8_t e_del, // 延续一个delete罚分正数
matesw_buf_t* cache, // 计算用到的一些数据
msw_buf_t* cache, // 计算用到的一些数据
uint8_t* seq1SoA, // ref序列已经pack好了
uint8_t* seq2SoA, // seq序列
int16_t nrow, // 最长的行数对应ref长度

51
mate_sw.c 100644
View File

@ -0,0 +1,51 @@
#include "mate_sw.h"
// 初始化mate sw计算相关的数据
void init_msw_data(msw_data_t* msw, int n_threads, int msw_batch_size) {
#define _alloc_msw(data, data_type) ((data) = calloc(n_threads, sizeof(data_type)))
_alloc_msw(msw->t_msw_tasks_u8, msw_task_v);
_alloc_msw(msw->t_msw_tasks_i16, msw_task_v);
_alloc_msw(msw->t_msw_buf, msw_buf_t);
_alloc_msw(msw->t_msw_stats, msw_stats_t);
_alloc_msw(msw->t_msw_ref_buf, byte_vv);
_alloc_msw(msw->t_msw_seq_buf, byte_vv);
_alloc_msw(msw->t_msw_1_refs, msw_ref_v);
_alloc_msw(msw->t_msw_1_seqs, msw_seq_v);
_alloc_msw(msw->t_msw_1_xtras, int_v);
_alloc_msw(msw->t_msw_1_kswrs, msw_kswr_v);
_alloc_msw(msw->t_msw_2_refs, msw_ref_v);
_alloc_msw(msw->t_msw_2_seqs, msw_seq_v);
_alloc_msw(msw->t_msw_2_xtras, int_v);
_alloc_msw(msw->t_msw_2_kswrs, msw_kswr_v);
// 初始化缓存空间
int i = 0, j = 0;
msw_ref_t init_ref = {0};
msw_seq_t init_seq = {0};
kswr_avx_t init_kswr = {0};
byte_v init_byte_v = {0};
for (i = 0; i < n_threads; ++i) {
for (j = 0; j < msw_batch_size; ++j) {
// for ref seq buf
kv_push(byte_v, msw->t_msw_ref_buf[i], init_byte_v);
kv_push(byte_v, msw->t_msw_seq_buf[i], init_byte_v);
// for msw第一阶段
kv_push(msw_ref_t, msw->t_msw_1_refs[i], init_ref);
kv_push(msw_seq_t, msw->t_msw_1_seqs[i], init_seq);
kv_push(int, msw->t_msw_1_xtras[i], 0);
kv_push(kswr_avx_t, msw->t_msw_1_kswrs[i], init_kswr);
// for msw第二阶段
kv_push(msw_ref_t, msw->t_msw_2_refs[i], init_ref);
kv_push(msw_seq_t, msw->t_msw_2_seqs[i], init_seq);
kv_push(int, msw->t_msw_2_xtras[i], 0);
kv_push(kswr_avx_t, msw->t_msw_2_kswrs[i], init_kswr);
}
}
}

88
mate_sw.h 100644
View File

@ -0,0 +1,88 @@
/*
Description: mate sw
Copyright : All right reserved by ICT
Author : Zhang Zhonghai
Date : 2026/01/12
*/
#pragma once
#include <stdint.h>
#include "ksw_align_avx.h"
#include "kvec.h"
// 需要做mate sw的read
typedef struct {
int is_rev; // seq是否在反向互补链上
int xtra;
int64_t rb, re; // ref的起始截止位置左闭右开
int64_t seq_id; // 对应的当前数据块的seq id
int aj; // 对应的pair-end的第几个aln;
int to; // task order,因为u8和i16分开放入的但是他们也是有顺序的这个用来排序
kswr_avx_t aln;
} msw_task_t;
// mate sw task数组
typedef kvec_t(msw_task_t) msw_task_v;
// mate sw task的指针数组
typedef kvec_t(msw_task_t*) msw_task_ptr_v;
//////////////////////////////////////////////
// 用来存放获取的mate sw阶段的ref序列
typedef kvec_t(byte_v) byte_vv;
// 记录一些用到的数据统计比如最长ref长度最长seq长度等
typedef struct {
int max_seq_len;
int max_ref_len; // mate sw里的最长的ref长度
} msw_stats_t;
// 用于记录参与计算的query只保存指针
typedef struct {
uint8_t* seq; // 只是指向seq的指针
int l_seq; // seq长度
} msw_seq_t;
typedef kvec_t(msw_seq_t) msw_seq_v;
// 用于记录参与计算的ref只保存指针
typedef struct {
uint8_t* ref;
int l_ref;
} msw_ref_t;
typedef kvec_t(msw_ref_t) msw_ref_v;
// 记录结果
typedef kvec_t(kswr_avx_t) msw_kswr_v;
////////////////////////////////////////////
// mate sw相关的所有数据
typedef struct {
msw_task_v* t_msw_tasks_u8; // 线程内收集需要做mate sw的任务的数据
msw_task_v* t_msw_tasks_i16;
msw_task_ptr_v p_msw_tasks_u8; // 线程共享的mate sw task指针
msw_task_ptr_v p_msw_tasks_i16;
msw_buf_t* t_msw_buf; // 每个线程一个mate sw 缓存
msw_stats_t* t_msw_stats; // 每个线程一个,统计信息
msw_stats_t p_msw_stats; //只是用来开辟空间
byte_vv* t_msw_ref_buf; // ref的缓存
byte_vv* t_msw_seq_buf; // seq的缓存因为有些seq需要反向互补而且用了buf之后第二阶段反转不需要再还原了
msw_ref_v* t_msw_1_refs; // msw 第一阶段数据
msw_seq_v* t_msw_1_seqs;
int_v* t_msw_1_xtras;
msw_kswr_v* t_msw_1_kswrs;
msw_ref_v* t_msw_2_refs; // msw 第一阶段数据
msw_seq_v* t_msw_2_seqs;
int_v* t_msw_2_xtras;
msw_kswr_v* t_msw_2_kswrs;
} msw_data_t;
void init_msw_data(msw_data_t* msw, int n_threads, int msw_batch_size);

View File

@ -1,5 +1,6 @@
#include "paired_sam.h"
#include <math.h>
#include <stdint.h>
#include <stdio.h>
@ -53,9 +54,29 @@ static inline uint8_t* get_ref_seq(const bntseq_t* bns, const uint8_t* pac, int6
return buf->a;
}
// 将query序列放入缓存
static inline void get_query_seq(int is_rev, int l_seq, uint8_t *seq, byte_v *query) {
int i = 0;
if (query->m < l_seq) {
kv_resize(uint8_t, *query, l_seq);
}
query->n = l_seq;
if (is_rev) {
for (i = 0; i < l_seq; ++i) query->a[l_seq - 1 - i] = seq[i] < 4 ? 3 - seq[i] : 4;
} else {
for (i = 0; i < l_seq; ++i) query->a[i] = seq[i];
}
}
// 翻转seq
static inline void revseq(int l, uint8_t* s) {
int i, t;
for (i = 0; i < l >> 1; ++i) t = s[i], s[i] = s[l - 1 - i], s[l - 1 - i] = t;
}
// 计算当前seq是否需要做matesw需要的话保存必要的数据
static void get_matesw_data(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_pestat_t pes[4], const mem_alnreg_t* a,
bseq1_t* seq, mem_alnreg_v* ma, int64_t sid, mem_stats_t *stats, matesw_data_v* msw8, matesw_data_v* msw16) {
static void get_matesw_tasks(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_pestat_t pes[4], const mem_alnreg_t* a, int a_j,
bseq1_t* seq, mem_alnreg_v* ma, int64_t sid, msw_stats_t* stats, msw_task_v* msw8, msw_task_v* msw16) {
int64_t l_pac = bns->l_pac;
int l_ms = seq->l_seq;
int i, r, skip[4], rid;
@ -69,6 +90,7 @@ static void get_matesw_data(const mem_opt_t* opt, const bntseq_t* bns, const uin
}
if (skip[0] + skip[1] + skip[2] + skip[3] == 4)
return; // consistent pair exist; no need to perform SW
int task_order = 0;
for (r = 0; r < 4; ++r) {
int is_rev, is_larger;
int64_t rb, re; // 左闭右开
@ -90,25 +112,28 @@ static void get_matesw_data(const mem_opt_t* opt, const bntseq_t* bns, const uin
re = l_pac << 1;
rid = get_rid_range(bns, pac, &rb, (rb + re) >> 1, &re); // 计算ref对应的染色体id和区间起始终止位置
if (a->rid == rid && re - rb >= opt->min_seed_len) { // no funny things happening
stats->max_msw_ref_len = max_(stats->max_msw_ref_len, re - rb);
stats->max_ref_len = max_(stats->max_ref_len, re - rb);
// fprintf(stderr, "zzh here\n");
int xtra = KSW_XSUBO | KSW_XSTART | (l_ms * opt->a < 250 ? KSW_XBYTE : 0) | (opt->min_seed_len * opt->a);
matesw_data_t* p;
msw_task_t* p;
if ((xtra & KSW_XBYTE))
p = kv_pushp(matesw_data_t, *msw8);
p = kv_pushp(msw_task_t, *msw8);
else
p = kv_pushp(matesw_data_t, *msw16);
p = kv_pushp(msw_task_t, *msw16);
p->is_rev = is_rev;
p->xtra = xtra;
p->rb = rb;
p->re = re;
p->seq_id = sid;
kv_push(matesw_data_t*, seq->msw, p); // 将matesw任务和对应的seq关联起来
p->aj = a_j;
p->to = task_order++;
// kv_push(msw_task_t*, seq->msw, p); // 将matesw任务和对应的seq关联起来这里放指针是不行的因为指针位置会变要保存offset才行
// kv_push(int, seq->msw, msw8->n - 1); // 这里需要考虑i16, 本线程的msw的offset
}
}
}
// 先计算哪些read需要做matesw
static void worker_matesw_data(void* data, int idx, int tid) {
static void worker_matesw_tasks(void* data, int idx, int tid) {
mem_worker_t* w = (mem_worker_t*)data;
const mem_opt_t* opt = w->opt;
int startIdx = idx * opt->batch_size;
@ -117,179 +142,65 @@ static void worker_matesw_data(void* data, int idx, int tid) {
endIdx = w->n_reads;
int id = 0;
mem_alnreg_v b[2];
int_v aj[2];
kv_init(aj[0]);
kv_init(aj[1]);
kv_init(b[0]);
kv_init(b[1]);
for (id = startIdx; id < endIdx; id += 2) {
int64_t i_s = id;
bseq1_t* s = &w->seqs[i_s];
mem_alnreg_v* a = &w->regs[i_s];
s->msw.n = 0; // 清空之前的matesw数据
int i, j;
mem_alnreg_v b[2];
kv_init(b[0]);
kv_init(b[1]);
_clear_vec(b[0]);
_clear_vec(b[1]);
_clear_vec(aj[0]);
_clear_vec(aj[1]);
// fprintf(stderr, "zzh test\n");
for (i = 0; i < 2; ++i)
for (j = 0; j < a[i].n; ++j)
if (a[i].a[j].score >= a[i].a[0].score - opt->pen_unpaired)
if (a[i].a[j].score >= a[i].a[0].score - opt->pen_unpaired) {
kv_push(mem_alnreg_t, b[i], a[i].a[j]);
kv_push(int, aj[i], j);
}
for (i = 0; i < 2; ++i)
for (j = 0; j < b[i].n && j < opt->max_matesw; ++j)
get_matesw_data(opt, w->bns, w->pac, w->pes, &b[i].a[j], &s[!i], &a[!i], i_s + !i, &w->mem_stats[tid], w->matesw_arr8[tid],
w->matesw_arr16[tid]);
free(b[0].a);
free(b[1].a);
get_matesw_tasks(opt, w->bns, w->pac, w->pes, &b[i].a[j], aj[i].a[j], &s[!i], &a[!i], i_s + !i, &w->msw->t_msw_stats[tid],
&w->msw->t_msw_tasks_u8[tid], &w->msw->t_msw_tasks_i16[tid]);
}
free(b[0].a);
free(b[1].a);
free(aj[0].a);
free(aj[1].a);
}
// 再多线程计算matesw利用inter-query的simd并行
static void worker_calc_matesw_avx512_u8(void* data, int idx, int tid) {
mem_worker_t* w = (mem_worker_t*)data;
const mem_opt_t* opt = w->opt;
int startIdx = idx * w->opt->msw_batch_size;
int endIdx = (idx + 1) * w->opt->msw_batch_size;
if (endIdx > w->msw_tasks8.n)
endIdx = w->msw_tasks8.n;
int roundEndIdx = ((endIdx + SIMD512_WIDTH8 - 1) / SIMD512_WIDTH8) * SIMD512_WIDTH8;
static void msw_avx512_u8(const mem_opt_t* opt, int num_tasks, msw_buf_t* msw_buf, msw_ref_v* refs, msw_seq_v* seqs, int_v* xtras,
msw_kswr_v* kswrs, int tid) {
int i = 0, j = 0, k = 0;
int refLen[SIMD512_WIDTH8] = {0};
matesw_buf_t* msw_buf = &w->msw_buf[tid]; // 缓冲区
byte_vv* refs = &w->msw_ref[tid];
msw_seq_v* seqs = &w->msw_seq[tid];
int_v* xtras = &w->msw_xtra[tid];
msw_kswr_v* kswrs = &w->msw_kswr[tid];
// 获取ref和seq数据
PROF_START(msw_get_ref);
for (i = startIdx, j = 0; i < endIdx; ++i, ++j) {
matesw_data_t* task = kv_A(w->msw_tasks8, i);
// 1. 获取对应的ref
byte_v* ref = &kv_A(*refs, j);
get_ref_seq(w->bns, w->pac, task->rb, task->re, ref);
// 2. 获取对应的seq
msw_seq_t* seq = &kv_A(*seqs, j);
seq->seq = (uint8_t*)w->seqs[task->seq_id].seq;
seq->l_seq = w->seqs[task->seq_id].l_seq;
// 3. 其他数据
xtras->a[j] = task->xtra;
}
for (; i < roundEndIdx; ++i, ++j) {
byte_v* ref = &kv_A(*refs, j);
ref->n = 0;
msw_seq_t* seq = &kv_A(*seqs, j);
seq->l_seq = 0;
xtras->a[j] = 0;
}
PROF_END(tprof[T_MSW_GET_REF][tid], msw_get_ref);
// 准备数据并进行计算
for (i = 0; i < endIdx - startIdx; i += SIMD512_WIDTH8) {
for (i = 0; i < num_tasks; i += SIMD512_WIDTH8) {
// 将ref和seq赋值给对应的用来计算的缓存
uint8_t* mySeq1SoA = msw_buf->refArr;
uint8_t* mySeq2SoA = msw_buf->seqArr;
int maxSeqLen = 0, maxRefLen = 0;
PROF_START(msw_pack_seq);
// 处理ref
// for test
#if 0
for (j = 0; j < SIMD512_WIDTH8; ++j) {
byte_v* ref = &refs->a[i + j];
uint8_t* seq1 = ref->a;
refLen[j] = ref->n;
// 处理ref
for (k = 0; k < ref->n; ++k) {
mySeq1SoA[k * SIMD512_WIDTH8 + j] = (seq1[k] == AMBIG_ ? AMBR : seq1[k]);
}
maxRefLen = max_(maxRefLen, ref->n);
fprintf(stderr, "r: %d, s: %d\n", refs->a[i + j].l_ref, seqs->a[i + j].l_seq);
fflush(stderr);
}
for (j = 0; j < SIMD512_WIDTH8; ++j) {
byte_v* ref = &refs->a[i + j];
for (k = ref->n; k < maxRefLen; ++k) {
mySeq1SoA[k * SIMD512_WIDTH8 + j] = 0xFF;
}
}
// 处理query
for (j = 0; j < SIMD512_WIDTH8; ++j) {
msw_seq_t* seq = &kv_A(*seqs, i + j);
uint8_t* seq2 = seq->seq;
int quanta = ((seq->l_seq + 16 - 1) / 16) * 16; // based on SSE-8 bit lane
for (k = 0; k < seq->l_seq; k++) {
mySeq2SoA[k * SIMD512_WIDTH8 + j] = (seq2[k] == AMBIG_ ? AMBQ : seq2[k]);
}
for (; k < quanta; ++k) {
mySeq2SoA[k * SIMD512_WIDTH8 + j] = DUMMY5; // SSE quanta
}
maxSeqLen = max_(maxSeqLen, quanta);
}
for (j = 0; j < SIMD512_WIDTH8; ++j) {
msw_seq_t* seq = &kv_A(*seqs, i + j);
int quanta = ((seq->l_seq + 16 - 1) / 16) * 16; // based on SSE-8 bit lane
for (k = quanta; k < maxSeqLen; k++) {
mySeq2SoA[k * SIMD512_WIDTH8 + j] = 0xFF;
}
}
PROF_END(tprof[T_MSW_PACK_SEQ][tid], msw_pack_seq);
// 利用smid指令计算
PROF_START(msw_1);
ksw_align_avx512_u8(opt->a, -1 * opt->b, opt->o_ins, opt->e_ins, opt->o_del, opt->e_del, msw_buf, mySeq1SoA, mySeq2SoA, maxRefLen, maxSeqLen,
&xtras->a[i], refLen, &kswrs->a[i], 0);
PROF_END(tprof[T_MSW_1][tid], msw_1);
}
#if 1
msw_ref_v* refs2 = &w->msw_2_ref[tid];
msw_seq_v* seqs2 = &w->msw_2_seq[tid];
int_v* xtras2 = &w->msw_2_xtra[tid];
msw_kswr_v* kswrs2 = &w->msw_2_kswr[tid];
int_v msw_task_ids; // 用于第二阶段任务统计
kv_init(msw_task_ids);
// 保存结果并筛选第二阶段msw计算的任务
for (i = startIdx, j = 0, k = 0; i < endIdx; ++i, ++j) {
matesw_data_t* task = kv_A(w->msw_tasks8, i);
task->aln = kswrs->a[j];
if ((task->xtra & KSW_XSTART) == 0 || ((task->xtra & KSW_XSUBO) && task->aln.score < (task->xtra & 0xffff)))
continue;
task->xtra = KSW_XSTOP | task->aln.score;
kv_push(int, msw_task_ids, j);
// 1. 获取对应的ref
byte_v* ref = &kv_A(*refs, j);
msw_ref_t* ref2 = &kv_A(*refs2, k);
ref2->l_ref = ref->n;
ref2->ref = ref->a;
// 2. 获取对应的seq
msw_seq_t* seq = &kv_A(*seqs, j);
msw_seq_t* seq2 = &kv_A(*seqs2, k);
seq2->seq = seq->seq;
seq2->l_seq = seq->l_seq;
// 3. 其他数据
xtras2->a[k] = task->xtra;
++k;
}
roundEndIdx = ((msw_task_ids.n + SIMD512_WIDTH8 - 1) / SIMD512_WIDTH8) * SIMD512_WIDTH8;
for (; k < roundEndIdx; ++k) {
msw_ref_t* ref2 = &kv_A(*refs2, k);
ref2->l_ref = 0;
msw_seq_t* seq2 = &kv_A(*seqs2, k);
seq2->l_seq = 0;
xtras2->a[k] = 0;
}
// 准备数据并进行计算
for (i = 0; i < msw_task_ids.n; i += SIMD512_WIDTH8) {
// 将ref和seq赋值给对应的用来计算的缓存
uint8_t* mySeq1SoA = msw_buf->refArr;
uint8_t* mySeq2SoA = msw_buf->seqArr;
int maxSeqLen = 0, maxRefLen = 0;
#endif
PROF_START(msw_pack_seq);
// 处理ref
for (j = 0; j < SIMD512_WIDTH8; ++j) {
msw_ref_t* ref = &refs2->a[i + j];
msw_ref_t* ref = &refs->a[i + j];
uint8_t* seq1 = ref->ref;
refLen[j] = ref->l_ref;
// 处理ref
@ -300,15 +211,15 @@ static void worker_calc_matesw_avx512_u8(void* data, int idx, int tid) {
}
for (j = 0; j < SIMD512_WIDTH8; ++j) {
msw_ref_t* ref = &refs2->a[i + j];
for (k = ref->l_ref; k < maxRefLen; ++k) {
msw_ref_t* ref = &refs->a[i + j];
for (k = ref->l_ref; k <= maxRefLen; ++k) {
mySeq1SoA[k * SIMD512_WIDTH8 + j] = 0xFF;
}
}
// 处理query
for (j = 0; j < SIMD512_WIDTH8; ++j) {
msw_seq_t* seq = &kv_A(*seqs2, i + j);
msw_seq_t* seq = &seqs->a[i + j];
uint8_t* seq2 = seq->seq;
int quanta = ((seq->l_seq + 16 - 1) / 16) * 16; // based on SSE-8 bit lane
for (k = 0; k < seq->l_seq; k++) {
@ -320,144 +231,382 @@ static void worker_calc_matesw_avx512_u8(void* data, int idx, int tid) {
maxSeqLen = max_(maxSeqLen, quanta);
}
for (j = 0; j < SIMD512_WIDTH8; ++j) {
msw_seq_t* seq = &kv_A(*seqs2, i + j);
msw_seq_t* seq = &seqs->a[i + j];
int quanta = ((seq->l_seq + 16 - 1) / 16) * 16; // based on SSE-8 bit lane
for (k = quanta; k < maxSeqLen; k++) {
for (k = quanta; k <= maxSeqLen; k++) {
mySeq2SoA[k * SIMD512_WIDTH8 + j] = 0xFF;
}
}
PROF_END(tprof[T_MSW_PACK_SEQ][tid], msw_pack_seq);
// 利用smid指令计算
PROF_START(msw_2);
ksw_align_avx512_u8(opt->a, -1 * opt->b, opt->o_ins, opt->e_ins, opt->o_del, opt->e_del, msw_buf, mySeq1SoA, mySeq2SoA, maxRefLen, maxSeqLen,
&xtras2->a[i], refLen, &kswrs2->a[i], 1);
PROF_END(tprof[T_MSW_2][tid], msw_2);
&xtras->a[i], refLen, &kswrs->a[i], 0);
}
}
// 再多线程计算matesw利用inter-query的simd并行
static void worker_calc_matesw_avx512_u8(void* data, int idx, int tid) {
mem_worker_t* w = (mem_worker_t*)data;
const mem_opt_t* opt = w->opt;
int startIdx = idx * w->opt->msw_batch_size;
int endIdx = (idx + 1) * w->opt->msw_batch_size;
if (endIdx > w->msw->p_msw_tasks_u8.n)
endIdx = w->msw->p_msw_tasks_u8.n;
int roundEndIdx = ((endIdx + SIMD512_WIDTH8 - 1) / SIMD512_WIDTH8) * SIMD512_WIDTH8;
int i = 0, j = 0, k = 0;
msw_buf_t* msw_buf = &w->msw->t_msw_buf[tid]; // 缓冲区
byte_vv* ref_bufs = &w->msw->t_msw_ref_buf[tid];
byte_vv* seq_bufs = &w->msw->t_msw_seq_buf[tid];
msw_ref_v* refs = &w->msw->t_msw_1_refs[tid];
msw_seq_v* seqs = &w->msw->t_msw_1_seqs[tid];
int_v* xtras = &w->msw->t_msw_1_xtras[tid];
msw_kswr_v* kswrs = &w->msw->t_msw_1_kswrs[tid];
// 获取ref和seq数据
PROF_START(msw_get_ref);
for (i = startIdx, j = 0; i < endIdx; ++i, ++j) {
msw_task_t* task = w->msw->p_msw_tasks_u8.a[i];
// 1. 获取对应的ref
byte_v* ref_buf = &ref_bufs->a[j];
get_ref_seq(w->bns, w->pac, task->rb, task->re, ref_buf);
refs->a[j].l_ref = ref_buf->n;
refs->a[j].ref = ref_buf->a;
// 2. 获取对应的seq
byte_v* seq_buf = &seq_bufs->a[j];
get_query_seq(task->is_rev, w->seqs[task->seq_id].l_seq, (uint8_t*)w->seqs[task->seq_id].seq, seq_buf);
seqs->a[j].l_seq = seq_buf->n;
seqs->a[j].seq = seq_buf->a;
// 3. 其他数据
xtras->a[j] = task->xtra;
kswrs->a[j].tb = kswrs->a[j].qb = -1;
}
for (; i < roundEndIdx; ++i, ++j) {
refs->a[j].l_ref = 0;
seqs->a[j].l_seq = 0;
xtras->a[j] = 0;
}
PROF_END(tprof[T_MSW_GET_REF][tid], msw_get_ref);
PROF_START(msw_1);
msw_avx512_u8(opt, endIdx - startIdx, msw_buf, refs, seqs, xtras, kswrs, tid);
PROF_END(tprof[T_MSW_1][tid], msw_1);
// 第二阶段计算
msw_ref_v* refs2 = &w->msw->t_msw_2_refs[tid];
msw_seq_v* seqs2 = &w->msw->t_msw_2_seqs[tid];
int_v* xtras2 = &w->msw->t_msw_2_xtras[tid];
msw_kswr_v* kswrs2 = &w->msw->t_msw_2_kswrs[tid];
int_v msw_task_ids = {0}; // 用于第二阶段任务统计
// 保存结果并筛选第二阶段msw计算的任务
for (i = startIdx, j = 0, k = 0; i < endIdx; ++i, ++j) {
msw_task_t* task = w->msw->p_msw_tasks_u8.a[i];
kswr_avx_t r = kswrs->a[j];
task->aln = r;
if ((task->xtra & KSW_XSTART) == 0 || ((task->xtra & KSW_XSUBO) && r.score < (task->xtra & 0xffff)))
continue;
task->xtra = KSW_XSTOP | task->aln.score;
kv_push(int, msw_task_ids, i);
// 1. 获取对应的ref
refs2->a[k] = refs->a[j];
revseq(r.te + 1, refs2->a[k].ref);
// 2. 获取对应的seq
seqs2->a[k] = seqs->a[j];
revseq(r.qe + 1, seqs2->a[k].seq);
seqs2->a[k].l_seq = r.qe + 1;
// 3. 其他数据
xtras->a[k] = task->xtra;
kswrs2->a[k] = r;
++k;
}
roundEndIdx = ((msw_task_ids.n + SIMD512_WIDTH8 - 1) / SIMD512_WIDTH8) * SIMD512_WIDTH8;
for (; k < roundEndIdx; ++k) {
refs2->a[k].l_ref = 0;
seqs2->a[k].l_seq = 0;
xtras2->a[k] = 0;
}
PROF_START(msw_2);
msw_avx512_u8(opt, msw_task_ids.n, msw_buf, refs2, seqs2, xtras2, kswrs2, tid);
PROF_END(tprof[T_MSW_2][tid], msw_2);
// 结果赋值
for (k = 0; k < msw_task_ids.n; ++k) {
i = msw_task_ids.a[k];
msw_task_t* task = w->msw->p_msw_tasks_u8.a[i];
task->aln = kswrs2->a[k];
}
kv_destroy(msw_task_ids);
#endif
}
static void worker_calc_matesw16(void* data, int i, int tid) {}
static void worker_calc_matesw_avx512_i16(void* data, int i, int tid) {}
// 检测添加新的align
static int check_add_align(kswr_avx_t aln, int min_seed_len, int is_rev, int64_t l_pac, mem_alnreg_t* a, int l_ms, uint8_t* ms, mem_alnreg_v* ma,
int64_t rb) {
int res = 0;
if (aln.score >= min_seed_len && aln.qb >= 0) { // something goes wrong if aln.qb < 0
int i, tmp;
mem_alnreg_t b;
memset(&b, 0, sizeof(mem_alnreg_t));
b.rid = a->rid;
b.is_alt = a->is_alt;
b.qb = is_rev ? l_ms - (aln.qe + 1) : aln.qb;
b.qe = is_rev ? l_ms - aln.qb : aln.qe + 1;
b.rb = is_rev ? (l_pac << 1) - (rb + aln.te + 1) : rb + aln.tb;
b.re = is_rev ? (l_pac << 1) - (rb + aln.tb) : rb + aln.te + 1;
b.score = aln.score;
b.csub = aln.score2;
b.secondary = -1;
b.seedcov = (b.re - b.rb < b.qe - b.qb ? b.re - b.rb : b.qe - b.qb) >> 1;
// printf("*** %d, [%lld,%lld], %d:%d, (%lld,%lld), (%lld,%lld) == (%lld,%lld)\n", aln.score, rb, re, is_rev,
// is_larger, a->rb, a->re, ma->a[0].rb, ma->a[0].re, b.rb, b.re);
kv_push(mem_alnreg_t, *ma, b); // make room for a new element
// move b s.t. ma is sorted
for (i = 0; i < ma->n - 1; ++i) // find the insertion point
if (ma->a[i].score < b.score)
break;
tmp = i;
for (i = ma->n - 1; i > tmp; --i) ma->a[i] = ma->a[i - 1];
ma->a[i] = b;
res = 1;
}
return res;
}
#define raw_mapq(diff, a) ((int)(6.02 * (diff) / (a) + .499))
// 最后再计算并生成sam数据
static void workder_gen_sam(void* data, int i, int tid) {
static void workder_gen_sam(void* data, int idx, int tid) {
mem_worker_t* w = (mem_worker_t*)data;
free(w->regs[i << 1 | 0].a);
free(w->regs[i << 1 | 1].a);
const mem_opt_t* opt = w->opt;
const bntseq_t* bns = w->bns;
const uint8_t* pac = w->pac;
const mem_pestat_t *pes = w->pes;
int startIdx = idx * opt->batch_size;
int endIdx = (idx + 1) * opt->batch_size;
if (endIdx > w->n_reads)
endIdx = w->n_reads;
int id = 0, i = 0, j = 0, k = 0;
for (id = startIdx; id < endIdx; id += 2) {
// 初始化变量
bseq1_t* s = &w->seqs[id];
mem_alnreg_v* a = &w->regs[id];
seq_sam_t* ss = &w->sams[id];
int z[2], o, subo, n_sub, extra_flag = 1, n_pri[2], n_aa[2];
mem_aln_t h[2], g[2], aa[2][2];
memset(h, 0, sizeof(mem_aln_t) * 2);
memset(g, 0, sizeof(mem_aln_t) * 2);
n_aa[0] = n_aa[1] = 0;
for (i = 0; i < 2; ++i) {
int si = !i;
int n = 0;
// 这里应该先给task排序因为u8和i16是分开排序的需要合在一起
for (k = 0; k < s[si].msw.n; ++k) {
msw_task_t* t = s[si].msw.a[k];
n += check_add_align(t->aln, opt->min_seed_len, t->is_rev, bns->l_pac, &a[!si].a[t->aj], s[si].l_seq, (uint8_t*)s[si].seq, &a[si], t->rb);
}
if (n > 0) {
a[si].n = mem_sort_dedup_patch(opt, 0, 0, 0, a[si].n, a[si].a);
}
}
////////////////////////////////////////////////////////////
n_pri[0] = mem_mark_primary_se(opt, a[0].n, a[0].a, id << 1 | 0);
n_pri[1] = mem_mark_primary_se(opt, a[1].n, a[1].a, id << 1 | 1);
if (opt->flag & MEM_F_PRIMARY5) {
mem_reorder_primary5(opt->T, &a[0]);
mem_reorder_primary5(opt->T, &a[1]);
}
if (opt->flag & MEM_F_NOPAIRING)
goto no_pairing;
// pairing single-end hits
if (n_pri[0] && n_pri[1] && (o = mem_pair(opt, bns, pac, pes, s, a, id, &subo, &n_sub, z, n_pri)) > 0) {
int is_multi[2], q_pe, score_un, q_se[2];
char** XA[2];
// check if an end has multiple hits even after mate-SW
for (i = 0; i < 2; ++i) {
for (j = 1; j < n_pri[i]; ++j)
if (a[i].a[j].secondary < 0 && a[i].a[j].score >= opt->T)
break;
is_multi[i] = j < n_pri[i] ? 1 : 0;
}
if (is_multi[0] || is_multi[1])
goto no_pairing; // TODO: in rare cases, the true hit may be long but with low score
// compute mapQ for the best SE hit
score_un = a[0].a[0].score + a[1].a[0].score - opt->pen_unpaired;
// q_pe = o && subo < o? (int)(MEM_MAPQ_COEF * (1. - (double)subo / o) * log(a[0].a[z[0]].seedcov + a[1].a[z[1]].seedcov) + .499) : 0;
subo = subo > score_un ? subo : score_un;
q_pe = raw_mapq(o - subo, opt->a);
if (n_sub > 0)
q_pe -= (int)(4.343 * log(n_sub + 1) + .499);
if (q_pe < 0)
q_pe = 0;
if (q_pe > 60)
q_pe = 60;
q_pe = (int)(q_pe * (1. - .5 * (a[0].a[0].frac_rep + a[1].a[0].frac_rep)) + .499);
// the following assumes no split hits
if (o > score_un) { // paired alignment is preferred
mem_alnreg_t* c[2];
c[0] = &a[0].a[z[0]];
c[1] = &a[1].a[z[1]];
for (i = 0; i < 2; ++i) {
if (c[i]->secondary >= 0)
c[i]->sub = a[i].a[c[i]->secondary].score, c[i]->secondary = -2;
q_se[i] = mem_approx_mapq_se(opt, c[i]);
}
q_se[0] = q_se[0] > q_pe ? q_se[0] : q_pe < q_se[0] + 40 ? q_pe : q_se[0] + 40;
q_se[1] = q_se[1] > q_pe ? q_se[1] : q_pe < q_se[1] + 40 ? q_pe : q_se[1] + 40;
extra_flag |= 2;
// cap at the tandem repeat score
q_se[0] = q_se[0] < raw_mapq(c[0]->score - c[0]->csub, opt->a) ? q_se[0] : raw_mapq(c[0]->score - c[0]->csub, opt->a);
q_se[1] = q_se[1] < raw_mapq(c[1]->score - c[1]->csub, opt->a) ? q_se[1] : raw_mapq(c[1]->score - c[1]->csub, opt->a);
} else { // the unpaired alignment is preferred
z[0] = z[1] = 0;
q_se[0] = mem_approx_mapq_se(opt, &a[0].a[0]);
q_se[1] = mem_approx_mapq_se(opt, &a[1].a[0]);
}
for (i = 0; i < 2; ++i) {
int k = a[i].a[z[i]].secondary_all;
if (k >= 0 && k < n_pri[i]) { // switch secondary and primary if both of them are non-ALT
assert(a[i].a[k].secondary_all < 0);
for (j = 0; j < a[i].n; ++j)
if (a[i].a[j].secondary_all == k || j == k)
a[i].a[j].secondary_all = z[i];
a[i].a[z[i]].secondary_all = -1;
}
}
if (!(opt->flag & MEM_F_ALL)) {
for (i = 0; i < 2; ++i) XA[i] = mem_gen_alt(opt, bns, pac, &a[i], s[i].l_seq, s[i].seq);
} else
XA[0] = XA[1] = 0;
// write SAM
for (i = 0; i < 2; ++i) {
h[i] = mem_reg2aln(opt, bns, pac, s[i].l_seq, s[i].seq, &a[i].a[z[i]]);
h[i].mapq = q_se[i];
h[i].flag |= 0x40 << i | extra_flag;
h[i].XA = XA[i] ? XA[i][z[i]] : 0;
aa[i][n_aa[i]++] = h[i];
if (n_pri[i] < a[i].n) { // the read has ALT hits
mem_alnreg_t* p = &a[i].a[n_pri[i]];
if (p->score < opt->T || p->secondary >= 0 || !p->is_alt)
continue;
g[i] = mem_reg2aln(opt, bns, pac, s[i].l_seq, s[i].seq, p);
g[i].flag |= 0x800 | 0x40 << i | extra_flag;
g[i].XA = XA[i] ? XA[i][n_pri[i]] : 0;
aa[i][n_aa[i]++] = g[i];
}
}
ss[0].sam.l = 0;
//PROF_START(aln2sam);
for (i = 0; i < n_aa[0]; ++i) mem_aln2sam(opt, bns, &ss[0].sam, &s[0], n_aa[0], aa[0], i, &h[1]); // write read1 hits
ss[1].sam.l = 0;
for (i = 0; i < n_aa[1]; ++i) mem_aln2sam(opt, bns, &ss[1].sam, &s[1], n_aa[1], aa[1], i, &h[0]); // write read2 hits
if (strcmp(s[0].name, s[1].name) != 0)
err_fatal(__func__, "paired reads have different names: \"%s\", \"%s\"\n", s[0].name, s[1].name);
// free
for (i = 0; i < 2; ++i) {
free(h[i].cigar);
free(g[i].cigar);
if (XA[i] == 0)
continue;
for (j = 0; j < a[i].n; ++j) free(XA[i][j]);
free(XA[i]);
}
} else
goto no_pairing;
no_pairing:
for (i = 0; i < 2; ++i) {
int which = -1;
if (a[i].n) {
if (a[i].a[0].score >= opt->T)
which = 0;
else if (n_pri[i] < a[i].n && a[i].a[n_pri[i]].score >= opt->T)
which = n_pri[i];
}
if (which >= 0)
h[i] = mem_reg2aln(opt, bns, pac, s[i].l_seq, s[i].seq, &a[i].a[which]);
else
h[i] = mem_reg2aln(opt, bns, pac, s[i].l_seq, s[i].seq, 0);
}
if (!(opt->flag & MEM_F_NOPAIRING) && h[0].rid == h[1].rid &&
h[0].rid >= 0) { // if the top hits from the two ends constitute a proper pair, flag it.
int64_t dist;
int d;
d = mem_infer_dir(bns->l_pac, a[0].a[0].rb, a[1].a[0].rb, &dist);
if (!pes[d].failed && dist >= pes[d].low && dist <= pes[d].high)
extra_flag |= 2;
}
mem_reg2sam(opt, bns, pac, &s[0], &a[0], 0x41 | extra_flag, &h[1], &ss[0]);
mem_reg2sam(opt, bns, pac, &s[1], &a[1], 0x81 | extra_flag, &h[0], &ss[1]);
if (strcmp(s[0].name, s[1].name) != 0)
err_fatal(__func__, "paired reads have different names: \"%s\", \"%s\"\n", s[0].name, s[1].name);
free(h[0].cigar);
free(h[1].cigar);
_destory_clear_vec(s[0].msw);
_destory_clear_vec(s[1].msw);
free(a[0].a);
free(a[1].a);
}
}
// 划分matesw任务
static void gather_matesw_task(mem_worker_t* w, matesw_data_v** msw_arr, matesw_ptr_v* tasks) {
tasks->n = 0;
static void gather_matesw_task(mem_worker_t* w, msw_task_v* thread_tasks, msw_task_ptr_v* tasks) {
_clear_vec(*tasks);
int i = 0, j = 0;
for (i = 0; i < w->opt->n_threads; ++i) {
for (j = 0; j < msw_arr[i]->n; ++j)
kv_push(matesw_data_t*, *tasks, &kv_A(*msw_arr[i], j));
for (j = 0; j < thread_tasks[i].n; ++j) {
msw_task_t* tp = &thread_tasks[i].a[j];
kv_push(msw_task_t*, *tasks, tp);
kv_push(msw_task_t*, w->seqs[tp->seq_id].msw, tp);
}
}
}
// 更新stats
static void update_mem_stats(mem_worker_t* w) {
static void update_msw_stats(mem_worker_t* w) {
int i = 0, max_seq_len = 0, max_ref_len = 0;
for (i = 0; i < w->opt->n_threads; ++i) {
max_seq_len = max_(max_seq_len, w->mem_stats[i].max_seq_len);
max_ref_len = max_(max_ref_len, w->mem_stats[i].max_msw_ref_len);
}
int quanta = (max_seq_len + 16 - 1) / 16; // based on SSE-8 bit lane
quanta *= 16;
max_seq_len = quanta + 1;
for (i = 0; i < w->opt->n_threads; ++i) {
w->mem_stats[i].max_seq_len = max_seq_len;
w->mem_stats[i].max_msw_ref_len = max_ref_len;
max_ref_len = max_(max_ref_len, w->msw->t_msw_stats[i].max_ref_len);
max_seq_len = max_(max_seq_len, w->msw->t_msw_stats[i].max_seq_len);
}
int quanta = ((max_seq_len + 16 - 1) / 16) * 16; // based on SSE-8 bit lane
max_seq_len = quanta + 1; // 这里需要加一因为ksw_avx512里赋值的时候是 <= ncol
w->msw->p_msw_stats.max_ref_len = max_ref_len + 1;
w->msw->p_msw_stats.max_seq_len = max_seq_len;
}
// 开辟缓冲区
static void alloc_update_cache_avx512(mem_worker_t* w) {
int i = 0, j = 0;
int i = 0;
for (i = 0; i < w->opt->n_threads; ++i) {
// 检查ref缓存空间
byte_vv* refs = &w->msw_ref[i];
if (refs->n < w->opt->msw_batch_size) { // 初始化ref缓存
for (j = refs->n; j < w->opt->msw_batch_size; ++j) {
byte_v *p = kv_pushp(byte_v, *refs);
kv_init(*p);
}
}
msw_ref_v* refs2 = &w->msw_2_ref[i];
if (refs2->n < w->opt->msw_batch_size) { // 初始化seq缓存
for (j = refs2->n; j < w->opt->msw_batch_size; ++j) {
msw_ref_t* p = kv_pushp(msw_ref_t, *refs2);
p->l_ref = 0;
p->ref = 0;
}
}
// 检查seq缓存空间
msw_seq_v* seqs = &w->msw_seq[i];
if (seqs->n < w->opt->msw_batch_size) { // 初始化seq缓存
for (j = seqs->n; j < w->opt->msw_batch_size; ++j) {
msw_seq_t* p = kv_pushp(msw_seq_t, *seqs);
p->l_seq = 0;
p->seq = 0;
}
}
msw_seq_v* seqs2 = &w->msw_2_seq[i];
if (seqs2->n < w->opt->msw_batch_size) { // 初始化seq缓存
for (j = seqs2->n; j < w->opt->msw_batch_size; ++j) {
msw_seq_t* p = kv_pushp(msw_seq_t, *seqs2);
p->l_seq = 0;
p->seq = 0;
}
}
msw_kswr_v* kswrs = &w->msw_kswr[i]; // 初始化msw结果
if (kswrs->n < w->opt->msw_batch_size) {
for (j = kswrs->n; j < w->opt->msw_batch_size; ++j) {
kv_pushp(kswr_avx_t, *kswrs);
}
}
msw_kswr_v* kswrs2 = &w->msw_2_kswr[i]; // 初始化msw结果
if (kswrs2->n < w->opt->msw_batch_size) {
for (j = kswrs2->n; j < w->opt->msw_batch_size; ++j) {
kv_pushp(kswr_avx_t, *kswrs2);
}
}
int_v* xtras = &w->msw_xtra[i]; // 初始化xtra
if (xtras->n < w->opt->msw_batch_size) {
for (j = xtras->n; j < w->opt->msw_batch_size; ++j) {
kv_pushp(int, *xtras);
}
}
int_v* xtras2 = &w->msw_2_xtra[i]; // 初始化xtra
if (xtras2->n < w->opt->msw_batch_size) {
for (j = xtras2->n; j < w->opt->msw_batch_size; ++j) {
kv_pushp(int, *xtras2);
}
}
matesw_buf_t* b = &w->msw_buf[i];
msw_buf_t* b = &w->msw->t_msw_buf[i];
// 更新跟ref len有关的缓冲区
if (b->ref_len < w->mem_stats[i].max_msw_ref_len) {
b->ref_len = w->mem_stats[i].max_msw_ref_len;
if (b->ref_len < w->msw->p_msw_stats.max_ref_len) {
b->ref_len = w->msw->p_msw_stats.max_ref_len;
if (b->refArr) _mm_free(b->refArr);
if (b->rowMax) _mm_free(b->rowMax);
b->refArr = (uint8_t*)_mm_malloc(b->ref_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64);
b->rowMax = (uint8_t*)_mm_malloc(b->ref_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64);
for (j = 0; j < w->opt->msw_batch_size; ++j) {
byte_v* ref = &kv_A(*refs, j);
kv_resize(uint8_t, *ref, b->ref_len);
// ref->n = ref->m;
}
}
// 更新跟seq len有关的缓冲区
if (b->seq_len < w->mem_stats[i].max_seq_len) {
b->seq_len = w->mem_stats[i].max_seq_len;
if (b->seq_len < w->msw->p_msw_stats.max_seq_len) {
b->seq_len = w->msw->p_msw_stats.max_seq_len;
if (b->seqArr) _mm_free(b->seqArr);
if (b->H0) _mm_free(b->H0);
if (b->H1) _mm_free(b->H1);
@ -476,37 +625,45 @@ static void alloc_update_cache_avx512(mem_worker_t* w) {
void gen_paired_sam(mem_worker_t* w) {
int i = 0;
for (i = 0; i < w->opt->n_threads; ++i) {
w->matesw_arr8[i]->n = 0; // 清空之前的数据
w->matesw_arr16[i]->n = 0;
w->msw->t_msw_tasks_u8[i].n = 0; // 清空之前的数据
w->msw->t_msw_tasks_i16[i].n = 0;
}
if (w->opt->flag & MEM_F_NO_RESCUE) {
kt_for(w->opt->n_threads, worker_sam, w, w->n_reads >> 1); // generate alignment
} else {
int batch_n = (w->n_reads + w->opt->batch_size - 1) / w->opt->batch_size;
// 1. 计算哪些read需要matesw
PROF_START(get_matesw_data);
kt_for(w->opt->n_threads, worker_matesw_data, w, batch_n);
kt_for(w->opt->n_threads, worker_matesw_tasks, w, batch_n);
PROF_END(gprof[G_get_matesw_data], get_matesw_data);
// 更新stats
PROF_START(update_stats_cache);
update_mem_stats(w);
update_msw_stats(w);
// 开辟缓冲区
alloc_update_cache_avx512(w);
PROF_END(gprof[G_update_stats_cache], update_stats_cache);
// 2. matesw计算过程
// 2. 收集每个线程中的msw任务
PROF_START(gather_matesw_task);
gather_matesw_task(w, w->matesw_arr8, &w->msw_tasks8);
gather_matesw_task(w, w->matesw_arr16, &w->msw_tasks16);
gather_matesw_task(w, w->msw->t_msw_tasks_u8, &w->msw->p_msw_tasks_u8);
gather_matesw_task(w, w->msw->t_msw_tasks_i16, &w->msw->p_msw_tasks_i16);
PROF_END(gprof[G_gather_matesw_task], gather_matesw_task);
PROF_START(calc_matesw);
int msw_batch_n = (w->msw_tasks8.n + w->opt->msw_batch_size - 1) / w->opt->msw_batch_size;
if (w->msw_tasks8.n > 0)
int msw_batch_n = (w->msw->p_msw_tasks_u8.n + w->opt->msw_batch_size - 1) / w->opt->msw_batch_size;
// 3. 处理msw任务
if (w->msw->p_msw_tasks_u8.n > 0)
kt_for(w->opt->n_threads, worker_calc_matesw_avx512_u8, w, msw_batch_n);
if (w->msw_tasks16.n > 0)
kt_for(w->opt->n_threads, worker_calc_matesw16, w, msw_batch_n);
if (w->msw->p_msw_tasks_i16.n > 0)
kt_for(w->opt->n_threads, worker_calc_matesw_avx512_i16, w, msw_batch_n);
PROF_END(gprof[G_calc_matesw], calc_matesw);
// 3.
kt_for(w->opt->n_threads, workder_gen_sam, w, w->n_reads >> 1);
// 4. 生成sam
kt_for(w->opt->n_threads, workder_gen_sam, w, batch_n);
}
fprintf(stderr, "zzh %d : 8: %ld 16: %ld\n", i, w->msw_tasks8.n, w->msw_tasks16.n);
fprintf(stderr, "zzh %d : 8: %ld 16: %ld\n", i, w->msw->p_msw_tasks_u8.n, w->msw->p_msw_tasks_i16.n);
}

View File

@ -14,6 +14,8 @@
#define SIMD512_WIDTH8 64
#define SIMD512_WIDTH16 32
void gen_paired_sam(mem_worker_t* w);
extern int mem_sam_pe(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2],
@ -21,4 +23,22 @@ extern int mem_sam_pe(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t*
extern void mem_reg2ovlp(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, bseq1_t* s, mem_alnreg_v* a);
extern void kt_for(int n_threads, void (*func)(void*, int, int), void* data, int n);
extern void kt_for(int n_threads, void (*func)(void*, int, int), void* data, int n);
extern int mem_sort_dedup_patch(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, uint8_t* query, int n, mem_alnreg_t* a);
extern void mem_reg2sam(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, bseq1_t* s, mem_alnreg_v* a, int extra_flag,
const mem_aln_t* m, seq_sam_t* ss);
extern void mem_aln2sam(const mem_opt_t* opt, const bntseq_t* bns, kstring_t* str, bseq1_t* s, int n, const mem_aln_t* list, int which,
const mem_aln_t* m);
extern int mem_mark_primary_se(const mem_opt_t* opt, int n, mem_alnreg_t* a, int64_t id);
extern int mem_approx_mapq_se(const mem_opt_t* opt, const mem_alnreg_t* a);
extern char** mem_gen_alt(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_alnreg_v* a, int l_query, const char* query);
extern void mem_reorder_primary5(int T, mem_alnreg_v* a);
extern int mem_pair(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_pestat_t pes[4], bseq1_t s[2], mem_alnreg_v a[2], int id,
int* sub, int* n_sub, int z[2], int n_pri[2]);

4
run.sh
View File

@ -7,8 +7,8 @@ n2=~/data/dataset/D2/n2.fq.gz
reference=~/data/reference/fmt/human_g1k_v37_decoy.fasta
out=/dev/null
#out=./out.sam
#out=/dev/null
out=./D2-out.sam
prog=./fastalign
#prog=/home/zzh/fastbwa

12
utils.h
View File

@ -34,6 +34,18 @@
#include "profiling.h"
#include "kvec.h"
#define _destory_clear_vec(arr) \
do { \
free((arr).a); \
(arr).a = 0; \
(arr).m = (arr).n = 0; \
} while (0)
#define _clear_vec(arr) \
do { \
(arr).n = 0; \
} while (0)
#undef MAX
#undef MIN
#define MAX(x, y) ((x) > (y) ? (x) : (y))