From 4c5e1e2fa17dff31d2ac5c1459103f40ce18bc7e Mon Sep 17 00:00:00 2001 From: Zhang Zhonghai Date: Tue, 13 Jan 2026 23:37:06 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E6=88=90=E4=BA=86matesw=E7=9A=84?= =?UTF-8?q?=E4=BB=A3=E7=A0=81=EF=BC=8C=E4=BD=86=E6=98=AF=E8=BF=98=E6=9C=89?= =?UTF-8?q?bug=EF=BC=8C=E7=BB=93=E6=9E=9C=E4=B8=8D=E4=B8=80=E8=87=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + .vscode/launch.json | 4 +- Makefile | 6 +- bwa.h | 9 +- bwamem.c | 9 +- bwamem.h | 14 +- ksw_align_avx.h | 5 +- ksw_align_avx512.c | 2 +- mate_sw.c | 51 ++++ mate_sw.h | 88 ++++++ paired_sam.c | 689 +++++++++++++++++++++++++++----------------- paired_sam.h | 22 +- run.sh | 4 +- utils.h | 12 + 14 files changed, 631 insertions(+), 285 deletions(-) create mode 100644 mate_sw.c create mode 100644 mate_sw.h diff --git a/.gitignore b/.gitignore index 72181d9..dab5627 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ *.log +*.sam # Prerequisites *.d diff --git a/.vscode/launch.json b/.vscode/launch.json index 86d6645..7bc316a 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -13,7 +13,7 @@ "args": [ "mem", "-t", - "1", + "32", "-M", "-R", "'@RG\\tID:normal\\tSM:normal\\tPL:illumina\\tLB:normal\\tPG:bwa'", @@ -21,7 +21,7 @@ "~/data/dataset/D1/n1.fq.gz", "~/data/dataset/D1/n2.fq.gz", "-o", - "/dev/null", + "D1-out.sam", ], "cwd": "${workspaceFolder}", // 当前工作路径:当前文件所在的工作空间 }, diff --git a/Makefile b/Makefile index ac5935f..a3664a3 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ CC= gcc -CFLAGS= -g -Wall -Wno-unused-function -mavx2 -mavx512bw -O2 +CFLAGS= -g -Wall -Wno-unused-function -mavx2 -mavx512bw -O3 WRAP_MALLOC=-DUSE_MALLOC_WRAPPERS SHOW_PERF= -DSHOW_PERF @@ -16,7 +16,9 @@ AOBJS= bwashm.o bwase.o bwaseqio.o bwtgap.o bwtaln.o bamlite.o \ bwtsw2_core.o bwtsw2_main.o bwtsw2_aux.o bwt_lite.o \ bwtsw2_chain.o fastmap.o bwtsw2_pair.o profiling.o \ fmt_idx.o ksw_extend2_avx2.o ksw_extend2_avx2_u8.o \ - debug.o paired_sam.o ksw_align_avx2.o ksw_align_avx512.o + debug.o paired_sam.o ksw_align_avx2.o ksw_align_avx512.o \ + mate_sw.o + PROG= fastalign INCLUDES= LIBS= -lm -lz -lpthread -ldl diff --git a/bwa.h b/bwa.h index 989d049..9f7b7a3 100644 --- a/bwa.h +++ b/bwa.h @@ -35,6 +35,7 @@ #include "kvec.h" #include "ksw.h" #include "ksw_align_avx.h" +#include "mate_sw.h" #define BWA_IDX_BWT 0x1 #define BWA_IDX_BNS 0x2 @@ -62,7 +63,7 @@ typedef struct { uint8_t *mem; } bwaidx_t; - +#if 0 // 需要做mate sw的read typedef struct { int is_rev; // seq是否在反向互补链上 @@ -78,13 +79,15 @@ typedef struct { } matesw_data_v; typedef kvec_t(matesw_data_t*) matesw_ptr_v; +#endif typedef struct { int l_seq, id; int m_name, m_comment, m_seq, m_qual; char *name, *comment, *seq, *qual; - matesw_ptr_v msw; - kstring_t sam; + msw_task_ptr_v msw; + //int_v msw; + // kstring_t sam; } bseq1_t; typedef struct { diff --git a/bwamem.c b/bwamem.c index eabf10c..d29b1b0 100644 --- a/bwamem.c +++ b/bwamem.c @@ -1072,7 +1072,7 @@ void mem_reorder_primary5(int T, mem_alnreg_v *a) // TODO (future plan): group hits into a uint64_t[] array. This will be cleaner and more flexible void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a, int extra_flag, const mem_aln_t *m, seq_sam_t *ss) { - extern char **mem_gen_alt(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, mem_alnreg_v *a, int l_query, const char *query); + extern char **mem_gen_alt(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_alnreg_v *a, int l_query, const char *query); // kstring_t str; kvec_t(mem_aln_t) aa; int k, l; @@ -1290,6 +1290,9 @@ mem_worker_t *init_mem_worker(const mem_opt_t *opt, const FMTIndex *fmt, const b w->smem_arr = malloc(i * sizeof(smem_v *)); w->chain_arr = malloc(i * sizeof(mem_chain_v *)); w->isize_arr = malloc(i * sizeof(uint64_v *)); + w->msw = calloc(1, sizeof(msw_data_t)); + init_msw_data(w->msw, opt->n_threads, opt->msw_batch_size); +#if 0 w->matesw_arr8 = malloc(i * sizeof(matesw_data_v*)); w->matesw_arr16 = malloc(i * sizeof(matesw_data_v*)); w->msw_buf = calloc(i, sizeof(matesw_buf_t)); @@ -1304,15 +1307,17 @@ mem_worker_t *init_mem_worker(const mem_opt_t *opt, const FMTIndex *fmt, const b w->msw_2_seq = calloc(i, sizeof(msw_seq_v)); w->msw_2_kswr = calloc(i, sizeof(msw_kswr_v)); w->msw_2_xtra = calloc(i, sizeof(int_v)); - +#endif for (i = 0; i < opt->n_threads; ++i) { w->aux[i] = smem_aux_init(); w->smem_arr[i] = malloc(opt->batch_size * sizeof(smem_v)); w->chain_arr[i] = malloc(opt->batch_size * sizeof(mem_chain_v)); w->isize_arr[i] = calloc(4, sizeof(uint64_v)); +#if 0 w->matesw_arr8[i] = calloc(1, sizeof(matesw_data_v)); w->matesw_arr16[i] = calloc(1, sizeof(matesw_data_v)); w->msw_2_arr8[i] = calloc(1, sizeof(matesw_data_t*)); +#endif for (j = 0; j < opt->batch_size; ++j) { kv_init(w->smem_arr[i][j].mem); kv_init(w->smem_arr[i][j].pos_arr); diff --git a/bwamem.h b/bwamem.h index 8ab06d5..ee78b65 100644 --- a/bwamem.h +++ b/bwamem.h @@ -27,12 +27,12 @@ #ifndef BWAMEM_H_ #define BWAMEM_H_ -#include "bwt.h" #include "bntseq.h" #include "bwa.h" +#include "bwt.h" #include "fmt_idx.h" -#include "utils.h" #include "ksw_align_avx.h" +#include "utils.h" #define MEM_MAPQ_COEF 30.0 #define MEM_MAPQ_MAX 60 @@ -154,6 +154,7 @@ typedef struct { uint64_v pos_arr; } smem_v; +#if 0 typedef struct { int start_tid; int start_idx; @@ -168,8 +169,6 @@ typedef struct { int max_msw_ref_len; // mate sw里的最长的ref长度 } mem_stats_t; // 记录一些用到的数据统计,比如最长ref长度,最长seq长度等 -typedef kvec_t(byte_v) byte_vv; - typedef struct { uint8_t* seq; // 只是指向seq的指针 int l_seq; // seq长度 @@ -186,6 +185,8 @@ typedef kvec_t(msw_ref_t) msw_ref_v; typedef kvec_t(kswr_avx_t) msw_kswr_v; +#endif + typedef struct { int calc_isize; const mem_opt_t *opt; @@ -201,6 +202,10 @@ typedef struct { mem_chain_v **chain_arr; mem_alnreg_v *regs; uint64_v **isize_arr; + + msw_data_t *msw; + +#if 0 matesw_data_v** matesw_arr8; // 收集需要做mate sw的任务的数据 matesw_ptr_v** msw_2_arr8; // 第二阶段任务,每个线程单独收集 matesw_data_v** matesw_arr16; @@ -222,6 +227,7 @@ typedef struct { msw_seq_v* msw_2_seq; msw_kswr_v* msw_2_kswr; int_v* msw_2_xtra; +#endif int64_t n_processed; int64_t n; // regs元素个数,可动态扩展 diff --git a/ksw_align_avx.h b/ksw_align_avx.h index 0ff01c2..ea3ea64 100644 --- a/ksw_align_avx.h +++ b/ksw_align_avx.h @@ -6,6 +6,7 @@ #include "ksw.h" + #define AMBIG_ 4 // ambiguous base // for 16 bit #define DUMMY1_ 4 @@ -40,7 +41,7 @@ typedef struct { uint8_t* rowMax; uint8_t* seqArr; uint8_t* refArr; -} matesw_buf_t; // inter-query算法的mate sw计算过程需要用到的缓存空间 +} msw_buf_t; // inter-query算法的mate sw计算过程需要用到的缓存空间 #ifdef __cplusplus extern "C" { @@ -56,7 +57,7 @@ void ksw_align_avx512_u8(int8_t w_match, // match分数,正数 int8_t e_ins, // 延续一个insert罚分,正数 int8_t o_del, // 开始一个delete罚分,正数 int8_t e_del, // 延续一个delete罚分,正数 - matesw_buf_t* cache, // 计算用到的一些数据 + msw_buf_t* cache, // 计算用到的一些数据 uint8_t* seq1SoA, // ref序列,已经pack好了 uint8_t* seq2SoA, // seq序列 int16_t nrow, // 最长的行数,对应ref长度 diff --git a/ksw_align_avx512.c b/ksw_align_avx512.c index 320e6da..bf0a386 100644 --- a/ksw_align_avx512.c +++ b/ksw_align_avx512.c @@ -62,7 +62,7 @@ void ksw_align_avx512_u8(int8_t w_match, // match分数,正数 int8_t e_ins, // 延续一个insert罚分,正数 int8_t o_del, // 开始一个delete罚分,正数 int8_t e_del, // 延续一个delete罚分,正数 - matesw_buf_t* cache, // 计算用到的一些数据 + msw_buf_t* cache, // 计算用到的一些数据 uint8_t* seq1SoA, // ref序列,已经pack好了 uint8_t* seq2SoA, // seq序列 int16_t nrow, // 最长的行数,对应ref长度 diff --git a/mate_sw.c b/mate_sw.c new file mode 100644 index 0000000..285c9e7 --- /dev/null +++ b/mate_sw.c @@ -0,0 +1,51 @@ +#include "mate_sw.h" + + +// 初始化mate sw计算相关的数据 +void init_msw_data(msw_data_t* msw, int n_threads, int msw_batch_size) { +#define _alloc_msw(data, data_type) ((data) = calloc(n_threads, sizeof(data_type))) + + _alloc_msw(msw->t_msw_tasks_u8, msw_task_v); + _alloc_msw(msw->t_msw_tasks_i16, msw_task_v); + _alloc_msw(msw->t_msw_buf, msw_buf_t); + _alloc_msw(msw->t_msw_stats, msw_stats_t); + _alloc_msw(msw->t_msw_ref_buf, byte_vv); + _alloc_msw(msw->t_msw_seq_buf, byte_vv); + + _alloc_msw(msw->t_msw_1_refs, msw_ref_v); + _alloc_msw(msw->t_msw_1_seqs, msw_seq_v); + _alloc_msw(msw->t_msw_1_xtras, int_v); + _alloc_msw(msw->t_msw_1_kswrs, msw_kswr_v); + + _alloc_msw(msw->t_msw_2_refs, msw_ref_v); + _alloc_msw(msw->t_msw_2_seqs, msw_seq_v); + _alloc_msw(msw->t_msw_2_xtras, int_v); + _alloc_msw(msw->t_msw_2_kswrs, msw_kswr_v); + + // 初始化缓存空间 + int i = 0, j = 0; + msw_ref_t init_ref = {0}; + msw_seq_t init_seq = {0}; + kswr_avx_t init_kswr = {0}; + byte_v init_byte_v = {0}; + + for (i = 0; i < n_threads; ++i) { + for (j = 0; j < msw_batch_size; ++j) { + // for ref seq buf + kv_push(byte_v, msw->t_msw_ref_buf[i], init_byte_v); + kv_push(byte_v, msw->t_msw_seq_buf[i], init_byte_v); + + // for msw第一阶段 + kv_push(msw_ref_t, msw->t_msw_1_refs[i], init_ref); + kv_push(msw_seq_t, msw->t_msw_1_seqs[i], init_seq); + kv_push(int, msw->t_msw_1_xtras[i], 0); + kv_push(kswr_avx_t, msw->t_msw_1_kswrs[i], init_kswr); + + // for msw第二阶段 + kv_push(msw_ref_t, msw->t_msw_2_refs[i], init_ref); + kv_push(msw_seq_t, msw->t_msw_2_seqs[i], init_seq); + kv_push(int, msw->t_msw_2_xtras[i], 0); + kv_push(kswr_avx_t, msw->t_msw_2_kswrs[i], init_kswr); + } + } +} \ No newline at end of file diff --git a/mate_sw.h b/mate_sw.h new file mode 100644 index 0000000..12a42ba --- /dev/null +++ b/mate_sw.h @@ -0,0 +1,88 @@ +/* + Description: 计算mate sw过程用到的一些数据和函数 + + Copyright : All right reserved by ICT + + Author : Zhang Zhonghai + Date : 2026/01/12 +*/ + +#pragma once + +#include + +#include "ksw_align_avx.h" +#include "kvec.h" + +// 需要做mate sw的read +typedef struct { + int is_rev; // seq是否在反向互补链上 + int xtra; + int64_t rb, re; // ref的起始截止位置,左闭右开 + int64_t seq_id; // 对应的当前数据块的seq id + int aj; // 对应的pair-end的第几个aln; + int to; // task order,因为u8和i16分开放入的,但是他们也是有顺序的,这个用来排序 + kswr_avx_t aln; +} msw_task_t; + +// mate sw task数组 +typedef kvec_t(msw_task_t) msw_task_v; + +// mate sw task的指针数组 +typedef kvec_t(msw_task_t*) msw_task_ptr_v; + +////////////////////////////////////////////// + +// 用来存放获取的mate sw阶段的ref序列 +typedef kvec_t(byte_v) byte_vv; + +// 记录一些用到的数据统计,比如最长ref长度,最长seq长度等 +typedef struct { + int max_seq_len; + int max_ref_len; // mate sw里的最长的ref长度 +} msw_stats_t; + +// 用于记录参与计算的query,只保存指针 +typedef struct { + uint8_t* seq; // 只是指向seq的指针 + int l_seq; // seq长度 +} msw_seq_t; +typedef kvec_t(msw_seq_t) msw_seq_v; + +// 用于记录参与计算的ref,只保存指针 +typedef struct { + uint8_t* ref; + int l_ref; +} msw_ref_t; +typedef kvec_t(msw_ref_t) msw_ref_v; + +// 记录结果 +typedef kvec_t(kswr_avx_t) msw_kswr_v; + +//////////////////////////////////////////// + +// mate sw相关的所有数据 +typedef struct { + msw_task_v* t_msw_tasks_u8; // 线程内收集需要做mate sw的任务的数据 + msw_task_v* t_msw_tasks_i16; + msw_task_ptr_v p_msw_tasks_u8; // 线程共享的mate sw task指针 + msw_task_ptr_v p_msw_tasks_i16; + msw_buf_t* t_msw_buf; // 每个线程一个,mate sw 缓存 + msw_stats_t* t_msw_stats; // 每个线程一个,统计信息 + msw_stats_t p_msw_stats; //只是用来开辟空间 + byte_vv* t_msw_ref_buf; // ref的缓存 + byte_vv* t_msw_seq_buf; // seq的缓存,因为有些seq需要反向互补,而且用了buf之后,第二阶段反转不需要再还原了 + + msw_ref_v* t_msw_1_refs; // msw 第一阶段数据 + msw_seq_v* t_msw_1_seqs; + int_v* t_msw_1_xtras; + msw_kswr_v* t_msw_1_kswrs; + + msw_ref_v* t_msw_2_refs; // msw 第一阶段数据 + msw_seq_v* t_msw_2_seqs; + int_v* t_msw_2_xtras; + msw_kswr_v* t_msw_2_kswrs; + +} msw_data_t; + +void init_msw_data(msw_data_t* msw, int n_threads, int msw_batch_size); \ No newline at end of file diff --git a/paired_sam.c b/paired_sam.c index 2f8014a..9804d23 100644 --- a/paired_sam.c +++ b/paired_sam.c @@ -1,5 +1,6 @@ #include "paired_sam.h" +#include #include #include @@ -53,9 +54,29 @@ static inline uint8_t* get_ref_seq(const bntseq_t* bns, const uint8_t* pac, int6 return buf->a; } +// 将query序列放入缓存 +static inline void get_query_seq(int is_rev, int l_seq, uint8_t *seq, byte_v *query) { + int i = 0; + if (query->m < l_seq) { + kv_resize(uint8_t, *query, l_seq); + } + query->n = l_seq; + if (is_rev) { + for (i = 0; i < l_seq; ++i) query->a[l_seq - 1 - i] = seq[i] < 4 ? 3 - seq[i] : 4; + } else { + for (i = 0; i < l_seq; ++i) query->a[i] = seq[i]; + } +} + +// 翻转seq +static inline void revseq(int l, uint8_t* s) { + int i, t; + for (i = 0; i < l >> 1; ++i) t = s[i], s[i] = s[l - 1 - i], s[l - 1 - i] = t; +} + // 计算当前seq是否需要做matesw,需要的话保存必要的数据 -static void get_matesw_data(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_pestat_t pes[4], const mem_alnreg_t* a, - bseq1_t* seq, mem_alnreg_v* ma, int64_t sid, mem_stats_t *stats, matesw_data_v* msw8, matesw_data_v* msw16) { +static void get_matesw_tasks(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_pestat_t pes[4], const mem_alnreg_t* a, int a_j, + bseq1_t* seq, mem_alnreg_v* ma, int64_t sid, msw_stats_t* stats, msw_task_v* msw8, msw_task_v* msw16) { int64_t l_pac = bns->l_pac; int l_ms = seq->l_seq; int i, r, skip[4], rid; @@ -69,6 +90,7 @@ static void get_matesw_data(const mem_opt_t* opt, const bntseq_t* bns, const uin } if (skip[0] + skip[1] + skip[2] + skip[3] == 4) return; // consistent pair exist; no need to perform SW + int task_order = 0; for (r = 0; r < 4; ++r) { int is_rev, is_larger; int64_t rb, re; // 左闭右开 @@ -90,25 +112,28 @@ static void get_matesw_data(const mem_opt_t* opt, const bntseq_t* bns, const uin re = l_pac << 1; rid = get_rid_range(bns, pac, &rb, (rb + re) >> 1, &re); // 计算ref对应的染色体id和区间起始终止位置 if (a->rid == rid && re - rb >= opt->min_seed_len) { // no funny things happening - stats->max_msw_ref_len = max_(stats->max_msw_ref_len, re - rb); + stats->max_ref_len = max_(stats->max_ref_len, re - rb); // fprintf(stderr, "zzh here\n"); int xtra = KSW_XSUBO | KSW_XSTART | (l_ms * opt->a < 250 ? KSW_XBYTE : 0) | (opt->min_seed_len * opt->a); - matesw_data_t* p; + msw_task_t* p; if ((xtra & KSW_XBYTE)) - p = kv_pushp(matesw_data_t, *msw8); + p = kv_pushp(msw_task_t, *msw8); else - p = kv_pushp(matesw_data_t, *msw16); + p = kv_pushp(msw_task_t, *msw16); p->is_rev = is_rev; p->xtra = xtra; p->rb = rb; p->re = re; p->seq_id = sid; - kv_push(matesw_data_t*, seq->msw, p); // 将matesw任务和对应的seq关联起来 + p->aj = a_j; + p->to = task_order++; + // kv_push(msw_task_t*, seq->msw, p); // 将matesw任务和对应的seq关联起来,这里放指针是不行的,因为指针位置会变,要保存offset才行 + // kv_push(int, seq->msw, msw8->n - 1); // 这里需要考虑i16, 本线程的msw的offset } } } // 先计算哪些read需要做matesw -static void worker_matesw_data(void* data, int idx, int tid) { +static void worker_matesw_tasks(void* data, int idx, int tid) { mem_worker_t* w = (mem_worker_t*)data; const mem_opt_t* opt = w->opt; int startIdx = idx * opt->batch_size; @@ -117,179 +142,65 @@ static void worker_matesw_data(void* data, int idx, int tid) { endIdx = w->n_reads; int id = 0; + mem_alnreg_v b[2]; + int_v aj[2]; + kv_init(aj[0]); + kv_init(aj[1]); + kv_init(b[0]); + kv_init(b[1]); for (id = startIdx; id < endIdx; id += 2) { int64_t i_s = id; bseq1_t* s = &w->seqs[i_s]; mem_alnreg_v* a = &w->regs[i_s]; s->msw.n = 0; // 清空之前的matesw数据 int i, j; - mem_alnreg_v b[2]; - kv_init(b[0]); - kv_init(b[1]); + + _clear_vec(b[0]); + _clear_vec(b[1]); + _clear_vec(aj[0]); + _clear_vec(aj[1]); // fprintf(stderr, "zzh test\n"); for (i = 0; i < 2; ++i) for (j = 0; j < a[i].n; ++j) - if (a[i].a[j].score >= a[i].a[0].score - opt->pen_unpaired) + if (a[i].a[j].score >= a[i].a[0].score - opt->pen_unpaired) { kv_push(mem_alnreg_t, b[i], a[i].a[j]); + kv_push(int, aj[i], j); + } for (i = 0; i < 2; ++i) for (j = 0; j < b[i].n && j < opt->max_matesw; ++j) - get_matesw_data(opt, w->bns, w->pac, w->pes, &b[i].a[j], &s[!i], &a[!i], i_s + !i, &w->mem_stats[tid], w->matesw_arr8[tid], - w->matesw_arr16[tid]); - free(b[0].a); - free(b[1].a); + get_matesw_tasks(opt, w->bns, w->pac, w->pes, &b[i].a[j], aj[i].a[j], &s[!i], &a[!i], i_s + !i, &w->msw->t_msw_stats[tid], + &w->msw->t_msw_tasks_u8[tid], &w->msw->t_msw_tasks_i16[tid]); } + free(b[0].a); + free(b[1].a); + free(aj[0].a); + free(aj[1].a); } -// 再多线程计算matesw,利用inter-query的simd并行 -static void worker_calc_matesw_avx512_u8(void* data, int idx, int tid) { - mem_worker_t* w = (mem_worker_t*)data; - const mem_opt_t* opt = w->opt; - int startIdx = idx * w->opt->msw_batch_size; - int endIdx = (idx + 1) * w->opt->msw_batch_size; - if (endIdx > w->msw_tasks8.n) - endIdx = w->msw_tasks8.n; - int roundEndIdx = ((endIdx + SIMD512_WIDTH8 - 1) / SIMD512_WIDTH8) * SIMD512_WIDTH8; - +static void msw_avx512_u8(const mem_opt_t* opt, int num_tasks, msw_buf_t* msw_buf, msw_ref_v* refs, msw_seq_v* seqs, int_v* xtras, + msw_kswr_v* kswrs, int tid) { int i = 0, j = 0, k = 0; int refLen[SIMD512_WIDTH8] = {0}; - matesw_buf_t* msw_buf = &w->msw_buf[tid]; // 缓冲区 - byte_vv* refs = &w->msw_ref[tid]; - msw_seq_v* seqs = &w->msw_seq[tid]; - int_v* xtras = &w->msw_xtra[tid]; - msw_kswr_v* kswrs = &w->msw_kswr[tid]; - - // 获取ref和seq数据 - PROF_START(msw_get_ref); - for (i = startIdx, j = 0; i < endIdx; ++i, ++j) { - matesw_data_t* task = kv_A(w->msw_tasks8, i); - // 1. 获取对应的ref - byte_v* ref = &kv_A(*refs, j); - get_ref_seq(w->bns, w->pac, task->rb, task->re, ref); - // 2. 获取对应的seq - msw_seq_t* seq = &kv_A(*seqs, j); - seq->seq = (uint8_t*)w->seqs[task->seq_id].seq; - seq->l_seq = w->seqs[task->seq_id].l_seq; - // 3. 其他数据 - xtras->a[j] = task->xtra; - } - for (; i < roundEndIdx; ++i, ++j) { - byte_v* ref = &kv_A(*refs, j); - ref->n = 0; - msw_seq_t* seq = &kv_A(*seqs, j); - seq->l_seq = 0; - xtras->a[j] = 0; - } - PROF_END(tprof[T_MSW_GET_REF][tid], msw_get_ref); - // 准备数据并进行计算 - for (i = 0; i < endIdx - startIdx; i += SIMD512_WIDTH8) { + for (i = 0; i < num_tasks; i += SIMD512_WIDTH8) { // 将ref和seq赋值给对应的用来计算的缓存 uint8_t* mySeq1SoA = msw_buf->refArr; uint8_t* mySeq2SoA = msw_buf->seqArr; int maxSeqLen = 0, maxRefLen = 0; - PROF_START(msw_pack_seq); - // 处理ref + // for test +#if 0 for (j = 0; j < SIMD512_WIDTH8; ++j) { - byte_v* ref = &refs->a[i + j]; - uint8_t* seq1 = ref->a; - refLen[j] = ref->n; - // 处理ref - for (k = 0; k < ref->n; ++k) { - mySeq1SoA[k * SIMD512_WIDTH8 + j] = (seq1[k] == AMBIG_ ? AMBR : seq1[k]); - } - maxRefLen = max_(maxRefLen, ref->n); + fprintf(stderr, "r: %d, s: %d\n", refs->a[i + j].l_ref, seqs->a[i + j].l_seq); + fflush(stderr); } - - for (j = 0; j < SIMD512_WIDTH8; ++j) { - byte_v* ref = &refs->a[i + j]; - for (k = ref->n; k < maxRefLen; ++k) { - mySeq1SoA[k * SIMD512_WIDTH8 + j] = 0xFF; - } - } - - // 处理query - for (j = 0; j < SIMD512_WIDTH8; ++j) { - msw_seq_t* seq = &kv_A(*seqs, i + j); - uint8_t* seq2 = seq->seq; - int quanta = ((seq->l_seq + 16 - 1) / 16) * 16; // based on SSE-8 bit lane - for (k = 0; k < seq->l_seq; k++) { - mySeq2SoA[k * SIMD512_WIDTH8 + j] = (seq2[k] == AMBIG_ ? AMBQ : seq2[k]); - } - for (; k < quanta; ++k) { - mySeq2SoA[k * SIMD512_WIDTH8 + j] = DUMMY5; // SSE quanta - } - maxSeqLen = max_(maxSeqLen, quanta); - } - for (j = 0; j < SIMD512_WIDTH8; ++j) { - msw_seq_t* seq = &kv_A(*seqs, i + j); - int quanta = ((seq->l_seq + 16 - 1) / 16) * 16; // based on SSE-8 bit lane - for (k = quanta; k < maxSeqLen; k++) { - mySeq2SoA[k * SIMD512_WIDTH8 + j] = 0xFF; - } - } - PROF_END(tprof[T_MSW_PACK_SEQ][tid], msw_pack_seq); - - // 利用smid指令计算 - PROF_START(msw_1); - ksw_align_avx512_u8(opt->a, -1 * opt->b, opt->o_ins, opt->e_ins, opt->o_del, opt->e_del, msw_buf, mySeq1SoA, mySeq2SoA, maxRefLen, maxSeqLen, - &xtras->a[i], refLen, &kswrs->a[i], 0); - PROF_END(tprof[T_MSW_1][tid], msw_1); - } - -#if 1 - msw_ref_v* refs2 = &w->msw_2_ref[tid]; - msw_seq_v* seqs2 = &w->msw_2_seq[tid]; - int_v* xtras2 = &w->msw_2_xtra[tid]; - msw_kswr_v* kswrs2 = &w->msw_2_kswr[tid]; - - int_v msw_task_ids; // 用于第二阶段任务统计 - kv_init(msw_task_ids); - // 保存结果,并筛选第二阶段msw计算的任务 - for (i = startIdx, j = 0, k = 0; i < endIdx; ++i, ++j) { - matesw_data_t* task = kv_A(w->msw_tasks8, i); - task->aln = kswrs->a[j]; - if ((task->xtra & KSW_XSTART) == 0 || ((task->xtra & KSW_XSUBO) && task->aln.score < (task->xtra & 0xffff))) - continue; - task->xtra = KSW_XSTOP | task->aln.score; - kv_push(int, msw_task_ids, j); - - // 1. 获取对应的ref - byte_v* ref = &kv_A(*refs, j); - msw_ref_t* ref2 = &kv_A(*refs2, k); - ref2->l_ref = ref->n; - ref2->ref = ref->a; - // 2. 获取对应的seq - msw_seq_t* seq = &kv_A(*seqs, j); - msw_seq_t* seq2 = &kv_A(*seqs2, k); - seq2->seq = seq->seq; - seq2->l_seq = seq->l_seq; - - // 3. 其他数据 - xtras2->a[k] = task->xtra; - ++k; - } - roundEndIdx = ((msw_task_ids.n + SIMD512_WIDTH8 - 1) / SIMD512_WIDTH8) * SIMD512_WIDTH8; - for (; k < roundEndIdx; ++k) { - msw_ref_t* ref2 = &kv_A(*refs2, k); - ref2->l_ref = 0; - msw_seq_t* seq2 = &kv_A(*seqs2, k); - seq2->l_seq = 0; - xtras2->a[k] = 0; - } - - // 准备数据并进行计算 - for (i = 0; i < msw_task_ids.n; i += SIMD512_WIDTH8) { - // 将ref和seq赋值给对应的用来计算的缓存 - uint8_t* mySeq1SoA = msw_buf->refArr; - uint8_t* mySeq2SoA = msw_buf->seqArr; - int maxSeqLen = 0, maxRefLen = 0; +#endif PROF_START(msw_pack_seq); // 处理ref for (j = 0; j < SIMD512_WIDTH8; ++j) { - msw_ref_t* ref = &refs2->a[i + j]; + msw_ref_t* ref = &refs->a[i + j]; uint8_t* seq1 = ref->ref; refLen[j] = ref->l_ref; // 处理ref @@ -300,15 +211,15 @@ static void worker_calc_matesw_avx512_u8(void* data, int idx, int tid) { } for (j = 0; j < SIMD512_WIDTH8; ++j) { - msw_ref_t* ref = &refs2->a[i + j]; - for (k = ref->l_ref; k < maxRefLen; ++k) { + msw_ref_t* ref = &refs->a[i + j]; + for (k = ref->l_ref; k <= maxRefLen; ++k) { mySeq1SoA[k * SIMD512_WIDTH8 + j] = 0xFF; } } // 处理query for (j = 0; j < SIMD512_WIDTH8; ++j) { - msw_seq_t* seq = &kv_A(*seqs2, i + j); + msw_seq_t* seq = &seqs->a[i + j]; uint8_t* seq2 = seq->seq; int quanta = ((seq->l_seq + 16 - 1) / 16) * 16; // based on SSE-8 bit lane for (k = 0; k < seq->l_seq; k++) { @@ -320,144 +231,382 @@ static void worker_calc_matesw_avx512_u8(void* data, int idx, int tid) { maxSeqLen = max_(maxSeqLen, quanta); } for (j = 0; j < SIMD512_WIDTH8; ++j) { - msw_seq_t* seq = &kv_A(*seqs2, i + j); + msw_seq_t* seq = &seqs->a[i + j]; int quanta = ((seq->l_seq + 16 - 1) / 16) * 16; // based on SSE-8 bit lane - for (k = quanta; k < maxSeqLen; k++) { + for (k = quanta; k <= maxSeqLen; k++) { mySeq2SoA[k * SIMD512_WIDTH8 + j] = 0xFF; } } PROF_END(tprof[T_MSW_PACK_SEQ][tid], msw_pack_seq); // 利用smid指令计算 - PROF_START(msw_2); ksw_align_avx512_u8(opt->a, -1 * opt->b, opt->o_ins, opt->e_ins, opt->o_del, opt->e_del, msw_buf, mySeq1SoA, mySeq2SoA, maxRefLen, maxSeqLen, - &xtras2->a[i], refLen, &kswrs2->a[i], 1); - PROF_END(tprof[T_MSW_2][tid], msw_2); + &xtras->a[i], refLen, &kswrs->a[i], 0); + } +} + +// 再多线程计算matesw,利用inter-query的simd并行 +static void worker_calc_matesw_avx512_u8(void* data, int idx, int tid) { + mem_worker_t* w = (mem_worker_t*)data; + const mem_opt_t* opt = w->opt; + int startIdx = idx * w->opt->msw_batch_size; + int endIdx = (idx + 1) * w->opt->msw_batch_size; + if (endIdx > w->msw->p_msw_tasks_u8.n) + endIdx = w->msw->p_msw_tasks_u8.n; + int roundEndIdx = ((endIdx + SIMD512_WIDTH8 - 1) / SIMD512_WIDTH8) * SIMD512_WIDTH8; + + int i = 0, j = 0, k = 0; + + msw_buf_t* msw_buf = &w->msw->t_msw_buf[tid]; // 缓冲区 + byte_vv* ref_bufs = &w->msw->t_msw_ref_buf[tid]; + byte_vv* seq_bufs = &w->msw->t_msw_seq_buf[tid]; + + msw_ref_v* refs = &w->msw->t_msw_1_refs[tid]; + msw_seq_v* seqs = &w->msw->t_msw_1_seqs[tid]; + int_v* xtras = &w->msw->t_msw_1_xtras[tid]; + msw_kswr_v* kswrs = &w->msw->t_msw_1_kswrs[tid]; + + // 获取ref和seq数据 + PROF_START(msw_get_ref); + for (i = startIdx, j = 0; i < endIdx; ++i, ++j) { + msw_task_t* task = w->msw->p_msw_tasks_u8.a[i]; + // 1. 获取对应的ref + byte_v* ref_buf = &ref_bufs->a[j]; + get_ref_seq(w->bns, w->pac, task->rb, task->re, ref_buf); + refs->a[j].l_ref = ref_buf->n; + refs->a[j].ref = ref_buf->a; + // 2. 获取对应的seq + byte_v* seq_buf = &seq_bufs->a[j]; + get_query_seq(task->is_rev, w->seqs[task->seq_id].l_seq, (uint8_t*)w->seqs[task->seq_id].seq, seq_buf); + seqs->a[j].l_seq = seq_buf->n; + seqs->a[j].seq = seq_buf->a; + // 3. 其他数据 + xtras->a[j] = task->xtra; + kswrs->a[j].tb = kswrs->a[j].qb = -1; + } + for (; i < roundEndIdx; ++i, ++j) { + refs->a[j].l_ref = 0; + seqs->a[j].l_seq = 0; + xtras->a[j] = 0; + } + PROF_END(tprof[T_MSW_GET_REF][tid], msw_get_ref); + + PROF_START(msw_1); + msw_avx512_u8(opt, endIdx - startIdx, msw_buf, refs, seqs, xtras, kswrs, tid); + PROF_END(tprof[T_MSW_1][tid], msw_1); + + // 第二阶段计算 + msw_ref_v* refs2 = &w->msw->t_msw_2_refs[tid]; + msw_seq_v* seqs2 = &w->msw->t_msw_2_seqs[tid]; + int_v* xtras2 = &w->msw->t_msw_2_xtras[tid]; + msw_kswr_v* kswrs2 = &w->msw->t_msw_2_kswrs[tid]; + int_v msw_task_ids = {0}; // 用于第二阶段任务统计 + + // 保存结果,并筛选第二阶段msw计算的任务 + for (i = startIdx, j = 0, k = 0; i < endIdx; ++i, ++j) { + msw_task_t* task = w->msw->p_msw_tasks_u8.a[i]; + kswr_avx_t r = kswrs->a[j]; + task->aln = r; + if ((task->xtra & KSW_XSTART) == 0 || ((task->xtra & KSW_XSUBO) && r.score < (task->xtra & 0xffff))) + continue; + task->xtra = KSW_XSTOP | task->aln.score; + kv_push(int, msw_task_ids, i); + // 1. 获取对应的ref + refs2->a[k] = refs->a[j]; + revseq(r.te + 1, refs2->a[k].ref); + // 2. 获取对应的seq + seqs2->a[k] = seqs->a[j]; + revseq(r.qe + 1, seqs2->a[k].seq); + seqs2->a[k].l_seq = r.qe + 1; + // 3. 其他数据 + xtras->a[k] = task->xtra; + kswrs2->a[k] = r; + ++k; + } + roundEndIdx = ((msw_task_ids.n + SIMD512_WIDTH8 - 1) / SIMD512_WIDTH8) * SIMD512_WIDTH8; + for (; k < roundEndIdx; ++k) { + refs2->a[k].l_ref = 0; + seqs2->a[k].l_seq = 0; + xtras2->a[k] = 0; + } + PROF_START(msw_2); + msw_avx512_u8(opt, msw_task_ids.n, msw_buf, refs2, seqs2, xtras2, kswrs2, tid); + PROF_END(tprof[T_MSW_2][tid], msw_2); + + // 结果赋值 + for (k = 0; k < msw_task_ids.n; ++k) { + i = msw_task_ids.a[k]; + msw_task_t* task = w->msw->p_msw_tasks_u8.a[i]; + task->aln = kswrs2->a[k]; } kv_destroy(msw_task_ids); -#endif } -static void worker_calc_matesw16(void* data, int i, int tid) {} +static void worker_calc_matesw_avx512_i16(void* data, int i, int tid) {} + +// 检测添加新的align +static int check_add_align(kswr_avx_t aln, int min_seed_len, int is_rev, int64_t l_pac, mem_alnreg_t* a, int l_ms, uint8_t* ms, mem_alnreg_v* ma, + int64_t rb) { + int res = 0; + if (aln.score >= min_seed_len && aln.qb >= 0) { // something goes wrong if aln.qb < 0 + int i, tmp; + mem_alnreg_t b; + memset(&b, 0, sizeof(mem_alnreg_t)); + b.rid = a->rid; + b.is_alt = a->is_alt; + b.qb = is_rev ? l_ms - (aln.qe + 1) : aln.qb; + b.qe = is_rev ? l_ms - aln.qb : aln.qe + 1; + b.rb = is_rev ? (l_pac << 1) - (rb + aln.te + 1) : rb + aln.tb; + b.re = is_rev ? (l_pac << 1) - (rb + aln.tb) : rb + aln.te + 1; + b.score = aln.score; + b.csub = aln.score2; + b.secondary = -1; + b.seedcov = (b.re - b.rb < b.qe - b.qb ? b.re - b.rb : b.qe - b.qb) >> 1; + // printf("*** %d, [%lld,%lld], %d:%d, (%lld,%lld), (%lld,%lld) == (%lld,%lld)\n", aln.score, rb, re, is_rev, + // is_larger, a->rb, a->re, ma->a[0].rb, ma->a[0].re, b.rb, b.re); + kv_push(mem_alnreg_t, *ma, b); // make room for a new element + // move b s.t. ma is sorted + for (i = 0; i < ma->n - 1; ++i) // find the insertion point + if (ma->a[i].score < b.score) + break; + tmp = i; + for (i = ma->n - 1; i > tmp; --i) ma->a[i] = ma->a[i - 1]; + ma->a[i] = b; + res = 1; + } + return res; +} + +#define raw_mapq(diff, a) ((int)(6.02 * (diff) / (a) + .499)) // 最后再计算并生成sam数据 -static void workder_gen_sam(void* data, int i, int tid) { +static void workder_gen_sam(void* data, int idx, int tid) { mem_worker_t* w = (mem_worker_t*)data; - free(w->regs[i << 1 | 0].a); - free(w->regs[i << 1 | 1].a); + const mem_opt_t* opt = w->opt; + const bntseq_t* bns = w->bns; + const uint8_t* pac = w->pac; + const mem_pestat_t *pes = w->pes; + + int startIdx = idx * opt->batch_size; + int endIdx = (idx + 1) * opt->batch_size; + if (endIdx > w->n_reads) + endIdx = w->n_reads; + + int id = 0, i = 0, j = 0, k = 0; + + for (id = startIdx; id < endIdx; id += 2) { + // 初始化变量 + bseq1_t* s = &w->seqs[id]; + mem_alnreg_v* a = &w->regs[id]; + seq_sam_t* ss = &w->sams[id]; + int z[2], o, subo, n_sub, extra_flag = 1, n_pri[2], n_aa[2]; + mem_aln_t h[2], g[2], aa[2][2]; + memset(h, 0, sizeof(mem_aln_t) * 2); + memset(g, 0, sizeof(mem_aln_t) * 2); + n_aa[0] = n_aa[1] = 0; + + for (i = 0; i < 2; ++i) { + int si = !i; + int n = 0; + // 这里应该先给task排序,因为u8和i16是分开排序的,需要合在一起 + for (k = 0; k < s[si].msw.n; ++k) { + msw_task_t* t = s[si].msw.a[k]; + n += check_add_align(t->aln, opt->min_seed_len, t->is_rev, bns->l_pac, &a[!si].a[t->aj], s[si].l_seq, (uint8_t*)s[si].seq, &a[si], t->rb); + } + if (n > 0) { + a[si].n = mem_sort_dedup_patch(opt, 0, 0, 0, a[si].n, a[si].a); + } + } + + //////////////////////////////////////////////////////////// + n_pri[0] = mem_mark_primary_se(opt, a[0].n, a[0].a, id << 1 | 0); + n_pri[1] = mem_mark_primary_se(opt, a[1].n, a[1].a, id << 1 | 1); + if (opt->flag & MEM_F_PRIMARY5) { + mem_reorder_primary5(opt->T, &a[0]); + mem_reorder_primary5(opt->T, &a[1]); + } + if (opt->flag & MEM_F_NOPAIRING) + goto no_pairing; + // pairing single-end hits + if (n_pri[0] && n_pri[1] && (o = mem_pair(opt, bns, pac, pes, s, a, id, &subo, &n_sub, z, n_pri)) > 0) { + int is_multi[2], q_pe, score_un, q_se[2]; + char** XA[2]; + // check if an end has multiple hits even after mate-SW + for (i = 0; i < 2; ++i) { + for (j = 1; j < n_pri[i]; ++j) + if (a[i].a[j].secondary < 0 && a[i].a[j].score >= opt->T) + break; + is_multi[i] = j < n_pri[i] ? 1 : 0; + } + if (is_multi[0] || is_multi[1]) + goto no_pairing; // TODO: in rare cases, the true hit may be long but with low score + // compute mapQ for the best SE hit + score_un = a[0].a[0].score + a[1].a[0].score - opt->pen_unpaired; + // q_pe = o && subo < o? (int)(MEM_MAPQ_COEF * (1. - (double)subo / o) * log(a[0].a[z[0]].seedcov + a[1].a[z[1]].seedcov) + .499) : 0; + subo = subo > score_un ? subo : score_un; + q_pe = raw_mapq(o - subo, opt->a); + if (n_sub > 0) + q_pe -= (int)(4.343 * log(n_sub + 1) + .499); + if (q_pe < 0) + q_pe = 0; + if (q_pe > 60) + q_pe = 60; + q_pe = (int)(q_pe * (1. - .5 * (a[0].a[0].frac_rep + a[1].a[0].frac_rep)) + .499); + // the following assumes no split hits + if (o > score_un) { // paired alignment is preferred + mem_alnreg_t* c[2]; + c[0] = &a[0].a[z[0]]; + c[1] = &a[1].a[z[1]]; + for (i = 0; i < 2; ++i) { + if (c[i]->secondary >= 0) + c[i]->sub = a[i].a[c[i]->secondary].score, c[i]->secondary = -2; + q_se[i] = mem_approx_mapq_se(opt, c[i]); + } + q_se[0] = q_se[0] > q_pe ? q_se[0] : q_pe < q_se[0] + 40 ? q_pe : q_se[0] + 40; + q_se[1] = q_se[1] > q_pe ? q_se[1] : q_pe < q_se[1] + 40 ? q_pe : q_se[1] + 40; + extra_flag |= 2; + // cap at the tandem repeat score + q_se[0] = q_se[0] < raw_mapq(c[0]->score - c[0]->csub, opt->a) ? q_se[0] : raw_mapq(c[0]->score - c[0]->csub, opt->a); + q_se[1] = q_se[1] < raw_mapq(c[1]->score - c[1]->csub, opt->a) ? q_se[1] : raw_mapq(c[1]->score - c[1]->csub, opt->a); + } else { // the unpaired alignment is preferred + z[0] = z[1] = 0; + q_se[0] = mem_approx_mapq_se(opt, &a[0].a[0]); + q_se[1] = mem_approx_mapq_se(opt, &a[1].a[0]); + } + for (i = 0; i < 2; ++i) { + int k = a[i].a[z[i]].secondary_all; + if (k >= 0 && k < n_pri[i]) { // switch secondary and primary if both of them are non-ALT + assert(a[i].a[k].secondary_all < 0); + for (j = 0; j < a[i].n; ++j) + if (a[i].a[j].secondary_all == k || j == k) + a[i].a[j].secondary_all = z[i]; + a[i].a[z[i]].secondary_all = -1; + } + } + + if (!(opt->flag & MEM_F_ALL)) { + for (i = 0; i < 2; ++i) XA[i] = mem_gen_alt(opt, bns, pac, &a[i], s[i].l_seq, s[i].seq); + } else + XA[0] = XA[1] = 0; + + // write SAM + + for (i = 0; i < 2; ++i) { + h[i] = mem_reg2aln(opt, bns, pac, s[i].l_seq, s[i].seq, &a[i].a[z[i]]); + h[i].mapq = q_se[i]; + h[i].flag |= 0x40 << i | extra_flag; + h[i].XA = XA[i] ? XA[i][z[i]] : 0; + aa[i][n_aa[i]++] = h[i]; + if (n_pri[i] < a[i].n) { // the read has ALT hits + mem_alnreg_t* p = &a[i].a[n_pri[i]]; + if (p->score < opt->T || p->secondary >= 0 || !p->is_alt) + continue; + g[i] = mem_reg2aln(opt, bns, pac, s[i].l_seq, s[i].seq, p); + g[i].flag |= 0x800 | 0x40 << i | extra_flag; + g[i].XA = XA[i] ? XA[i][n_pri[i]] : 0; + aa[i][n_aa[i]++] = g[i]; + } + } + ss[0].sam.l = 0; + //PROF_START(aln2sam); + for (i = 0; i < n_aa[0]; ++i) mem_aln2sam(opt, bns, &ss[0].sam, &s[0], n_aa[0], aa[0], i, &h[1]); // write read1 hits + ss[1].sam.l = 0; + for (i = 0; i < n_aa[1]; ++i) mem_aln2sam(opt, bns, &ss[1].sam, &s[1], n_aa[1], aa[1], i, &h[0]); // write read2 hits + if (strcmp(s[0].name, s[1].name) != 0) + err_fatal(__func__, "paired reads have different names: \"%s\", \"%s\"\n", s[0].name, s[1].name); + // free + for (i = 0; i < 2; ++i) { + free(h[i].cigar); + free(g[i].cigar); + if (XA[i] == 0) + continue; + for (j = 0; j < a[i].n; ++j) free(XA[i][j]); + free(XA[i]); + } + } else + goto no_pairing; + + no_pairing: + for (i = 0; i < 2; ++i) { + int which = -1; + if (a[i].n) { + if (a[i].a[0].score >= opt->T) + which = 0; + else if (n_pri[i] < a[i].n && a[i].a[n_pri[i]].score >= opt->T) + which = n_pri[i]; + } + if (which >= 0) + h[i] = mem_reg2aln(opt, bns, pac, s[i].l_seq, s[i].seq, &a[i].a[which]); + else + h[i] = mem_reg2aln(opt, bns, pac, s[i].l_seq, s[i].seq, 0); + } + if (!(opt->flag & MEM_F_NOPAIRING) && h[0].rid == h[1].rid && + h[0].rid >= 0) { // if the top hits from the two ends constitute a proper pair, flag it. + int64_t dist; + int d; + d = mem_infer_dir(bns->l_pac, a[0].a[0].rb, a[1].a[0].rb, &dist); + if (!pes[d].failed && dist >= pes[d].low && dist <= pes[d].high) + extra_flag |= 2; + } + mem_reg2sam(opt, bns, pac, &s[0], &a[0], 0x41 | extra_flag, &h[1], &ss[0]); + mem_reg2sam(opt, bns, pac, &s[1], &a[1], 0x81 | extra_flag, &h[0], &ss[1]); + if (strcmp(s[0].name, s[1].name) != 0) + err_fatal(__func__, "paired reads have different names: \"%s\", \"%s\"\n", s[0].name, s[1].name); + free(h[0].cigar); + free(h[1].cigar); + + _destory_clear_vec(s[0].msw); + _destory_clear_vec(s[1].msw); + + free(a[0].a); + free(a[1].a); + } } // 划分matesw任务 -static void gather_matesw_task(mem_worker_t* w, matesw_data_v** msw_arr, matesw_ptr_v* tasks) { - tasks->n = 0; +static void gather_matesw_task(mem_worker_t* w, msw_task_v* thread_tasks, msw_task_ptr_v* tasks) { + _clear_vec(*tasks); int i = 0, j = 0; for (i = 0; i < w->opt->n_threads; ++i) { - for (j = 0; j < msw_arr[i]->n; ++j) - kv_push(matesw_data_t*, *tasks, &kv_A(*msw_arr[i], j)); + for (j = 0; j < thread_tasks[i].n; ++j) { + msw_task_t* tp = &thread_tasks[i].a[j]; + kv_push(msw_task_t*, *tasks, tp); + kv_push(msw_task_t*, w->seqs[tp->seq_id].msw, tp); + } } } // 更新stats -static void update_mem_stats(mem_worker_t* w) { +static void update_msw_stats(mem_worker_t* w) { int i = 0, max_seq_len = 0, max_ref_len = 0; for (i = 0; i < w->opt->n_threads; ++i) { - max_seq_len = max_(max_seq_len, w->mem_stats[i].max_seq_len); - max_ref_len = max_(max_ref_len, w->mem_stats[i].max_msw_ref_len); - } - int quanta = (max_seq_len + 16 - 1) / 16; // based on SSE-8 bit lane - quanta *= 16; - max_seq_len = quanta + 1; - for (i = 0; i < w->opt->n_threads; ++i) { - w->mem_stats[i].max_seq_len = max_seq_len; - w->mem_stats[i].max_msw_ref_len = max_ref_len; + max_ref_len = max_(max_ref_len, w->msw->t_msw_stats[i].max_ref_len); + max_seq_len = max_(max_seq_len, w->msw->t_msw_stats[i].max_seq_len); } + int quanta = ((max_seq_len + 16 - 1) / 16) * 16; // based on SSE-8 bit lane + max_seq_len = quanta + 1; // 这里需要加一,因为ksw_avx512里赋值的时候是 <= ncol + + w->msw->p_msw_stats.max_ref_len = max_ref_len + 1; + w->msw->p_msw_stats.max_seq_len = max_seq_len; } // 开辟缓冲区 static void alloc_update_cache_avx512(mem_worker_t* w) { - int i = 0, j = 0; + int i = 0; for (i = 0; i < w->opt->n_threads; ++i) { - // 检查ref缓存空间 - byte_vv* refs = &w->msw_ref[i]; - if (refs->n < w->opt->msw_batch_size) { // 初始化ref缓存 - for (j = refs->n; j < w->opt->msw_batch_size; ++j) { - byte_v *p = kv_pushp(byte_v, *refs); - kv_init(*p); - } - } - msw_ref_v* refs2 = &w->msw_2_ref[i]; - if (refs2->n < w->opt->msw_batch_size) { // 初始化seq缓存 - for (j = refs2->n; j < w->opt->msw_batch_size; ++j) { - msw_ref_t* p = kv_pushp(msw_ref_t, *refs2); - p->l_ref = 0; - p->ref = 0; - } - } - - // 检查seq缓存空间 - msw_seq_v* seqs = &w->msw_seq[i]; - if (seqs->n < w->opt->msw_batch_size) { // 初始化seq缓存 - for (j = seqs->n; j < w->opt->msw_batch_size; ++j) { - msw_seq_t* p = kv_pushp(msw_seq_t, *seqs); - p->l_seq = 0; - p->seq = 0; - } - } - msw_seq_v* seqs2 = &w->msw_2_seq[i]; - if (seqs2->n < w->opt->msw_batch_size) { // 初始化seq缓存 - for (j = seqs2->n; j < w->opt->msw_batch_size; ++j) { - msw_seq_t* p = kv_pushp(msw_seq_t, *seqs2); - p->l_seq = 0; - p->seq = 0; - } - } - - msw_kswr_v* kswrs = &w->msw_kswr[i]; // 初始化msw结果 - if (kswrs->n < w->opt->msw_batch_size) { - for (j = kswrs->n; j < w->opt->msw_batch_size; ++j) { - kv_pushp(kswr_avx_t, *kswrs); - } - } - - msw_kswr_v* kswrs2 = &w->msw_2_kswr[i]; // 初始化msw结果 - if (kswrs2->n < w->opt->msw_batch_size) { - for (j = kswrs2->n; j < w->opt->msw_batch_size; ++j) { - kv_pushp(kswr_avx_t, *kswrs2); - } - } - - int_v* xtras = &w->msw_xtra[i]; // 初始化xtra - if (xtras->n < w->opt->msw_batch_size) { - for (j = xtras->n; j < w->opt->msw_batch_size; ++j) { - kv_pushp(int, *xtras); - } - } - - int_v* xtras2 = &w->msw_2_xtra[i]; // 初始化xtra - if (xtras2->n < w->opt->msw_batch_size) { - for (j = xtras2->n; j < w->opt->msw_batch_size; ++j) { - kv_pushp(int, *xtras2); - } - } - - matesw_buf_t* b = &w->msw_buf[i]; + msw_buf_t* b = &w->msw->t_msw_buf[i]; // 更新跟ref len有关的缓冲区 - if (b->ref_len < w->mem_stats[i].max_msw_ref_len) { - b->ref_len = w->mem_stats[i].max_msw_ref_len; + if (b->ref_len < w->msw->p_msw_stats.max_ref_len) { + b->ref_len = w->msw->p_msw_stats.max_ref_len; if (b->refArr) _mm_free(b->refArr); if (b->rowMax) _mm_free(b->rowMax); b->refArr = (uint8_t*)_mm_malloc(b->ref_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64); b->rowMax = (uint8_t*)_mm_malloc(b->ref_len * SIMD512_WIDTH8 * sizeof(uint8_t), 64); - for (j = 0; j < w->opt->msw_batch_size; ++j) { - byte_v* ref = &kv_A(*refs, j); - kv_resize(uint8_t, *ref, b->ref_len); - // ref->n = ref->m; - } } // 更新跟seq len有关的缓冲区 - if (b->seq_len < w->mem_stats[i].max_seq_len) { - b->seq_len = w->mem_stats[i].max_seq_len; + if (b->seq_len < w->msw->p_msw_stats.max_seq_len) { + b->seq_len = w->msw->p_msw_stats.max_seq_len; if (b->seqArr) _mm_free(b->seqArr); if (b->H0) _mm_free(b->H0); if (b->H1) _mm_free(b->H1); @@ -476,37 +625,45 @@ static void alloc_update_cache_avx512(mem_worker_t* w) { void gen_paired_sam(mem_worker_t* w) { int i = 0; for (i = 0; i < w->opt->n_threads; ++i) { - w->matesw_arr8[i]->n = 0; // 清空之前的数据 - w->matesw_arr16[i]->n = 0; + w->msw->t_msw_tasks_u8[i].n = 0; // 清空之前的数据 + w->msw->t_msw_tasks_i16[i].n = 0; } if (w->opt->flag & MEM_F_NO_RESCUE) { kt_for(w->opt->n_threads, worker_sam, w, w->n_reads >> 1); // generate alignment } else { int batch_n = (w->n_reads + w->opt->batch_size - 1) / w->opt->batch_size; + // 1. 计算哪些read需要matesw PROF_START(get_matesw_data); - kt_for(w->opt->n_threads, worker_matesw_data, w, batch_n); + kt_for(w->opt->n_threads, worker_matesw_tasks, w, batch_n); PROF_END(gprof[G_get_matesw_data], get_matesw_data); + // 更新stats PROF_START(update_stats_cache); - update_mem_stats(w); + update_msw_stats(w); + // 开辟缓冲区 alloc_update_cache_avx512(w); PROF_END(gprof[G_update_stats_cache], update_stats_cache); - // 2. matesw计算过程 + + // 2. 收集每个线程中的msw任务 PROF_START(gather_matesw_task); - gather_matesw_task(w, w->matesw_arr8, &w->msw_tasks8); - gather_matesw_task(w, w->matesw_arr16, &w->msw_tasks16); + gather_matesw_task(w, w->msw->t_msw_tasks_u8, &w->msw->p_msw_tasks_u8); + gather_matesw_task(w, w->msw->t_msw_tasks_i16, &w->msw->p_msw_tasks_i16); PROF_END(gprof[G_gather_matesw_task], gather_matesw_task); PROF_START(calc_matesw); - int msw_batch_n = (w->msw_tasks8.n + w->opt->msw_batch_size - 1) / w->opt->msw_batch_size; - if (w->msw_tasks8.n > 0) + + int msw_batch_n = (w->msw->p_msw_tasks_u8.n + w->opt->msw_batch_size - 1) / w->opt->msw_batch_size; + + // 3. 处理msw任务 + if (w->msw->p_msw_tasks_u8.n > 0) kt_for(w->opt->n_threads, worker_calc_matesw_avx512_u8, w, msw_batch_n); - if (w->msw_tasks16.n > 0) - kt_for(w->opt->n_threads, worker_calc_matesw16, w, msw_batch_n); + if (w->msw->p_msw_tasks_i16.n > 0) + kt_for(w->opt->n_threads, worker_calc_matesw_avx512_i16, w, msw_batch_n); PROF_END(gprof[G_calc_matesw], calc_matesw); - // 3. - kt_for(w->opt->n_threads, workder_gen_sam, w, w->n_reads >> 1); + + // 4. 生成sam + kt_for(w->opt->n_threads, workder_gen_sam, w, batch_n); } - fprintf(stderr, "zzh %d : 8: %ld 16: %ld\n", i, w->msw_tasks8.n, w->msw_tasks16.n); + fprintf(stderr, "zzh %d : 8: %ld 16: %ld\n", i, w->msw->p_msw_tasks_u8.n, w->msw->p_msw_tasks_i16.n); } \ No newline at end of file diff --git a/paired_sam.h b/paired_sam.h index 337f314..ac6ddd9 100644 --- a/paired_sam.h +++ b/paired_sam.h @@ -14,6 +14,8 @@ #define SIMD512_WIDTH8 64 #define SIMD512_WIDTH16 32 + + void gen_paired_sam(mem_worker_t* w); extern int mem_sam_pe(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], @@ -21,4 +23,22 @@ extern int mem_sam_pe(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* extern void mem_reg2ovlp(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, bseq1_t* s, mem_alnreg_v* a); -extern void kt_for(int n_threads, void (*func)(void*, int, int), void* data, int n); \ No newline at end of file +extern void kt_for(int n_threads, void (*func)(void*, int, int), void* data, int n); + +extern int mem_sort_dedup_patch(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, uint8_t* query, int n, mem_alnreg_t* a); + +extern void mem_reg2sam(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, bseq1_t* s, mem_alnreg_v* a, int extra_flag, + const mem_aln_t* m, seq_sam_t* ss); + +extern void mem_aln2sam(const mem_opt_t* opt, const bntseq_t* bns, kstring_t* str, bseq1_t* s, int n, const mem_aln_t* list, int which, + const mem_aln_t* m); + +extern int mem_mark_primary_se(const mem_opt_t* opt, int n, mem_alnreg_t* a, int64_t id); +extern int mem_approx_mapq_se(const mem_opt_t* opt, const mem_alnreg_t* a); + +extern char** mem_gen_alt(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_alnreg_v* a, int l_query, const char* query); + +extern void mem_reorder_primary5(int T, mem_alnreg_v* a); + +extern int mem_pair(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_pestat_t pes[4], bseq1_t s[2], mem_alnreg_v a[2], int id, + int* sub, int* n_sub, int z[2], int n_pri[2]); \ No newline at end of file diff --git a/run.sh b/run.sh index 1446e8a..a52b5a8 100755 --- a/run.sh +++ b/run.sh @@ -7,8 +7,8 @@ n2=~/data/dataset/D2/n2.fq.gz reference=~/data/reference/fmt/human_g1k_v37_decoy.fasta -out=/dev/null -#out=./out.sam +#out=/dev/null +out=./D2-out.sam prog=./fastalign #prog=/home/zzh/fastbwa diff --git a/utils.h b/utils.h index 826c5c2..f85bd42 100644 --- a/utils.h +++ b/utils.h @@ -34,6 +34,18 @@ #include "profiling.h" #include "kvec.h" +#define _destory_clear_vec(arr) \ + do { \ + free((arr).a); \ + (arr).a = 0; \ + (arr).m = (arr).n = 0; \ + } while (0) + +#define _clear_vec(arr) \ + do { \ + (arr).n = 0; \ + } while (0) + #undef MAX #undef MIN #define MAX(x, y) ((x) > (y) ? (x) : (y))