清理了一下代码,优雅的处理u8和i16

This commit is contained in:
zzh 2026-01-18 01:59:42 +08:00
parent 56c687d23d
commit fd21021e6a
4 changed files with 58 additions and 50 deletions

View File

@ -3,10 +3,15 @@
// 初始化mate sw计算相关的数据
void init_msw_data(msw_data_t* msw, int n_threads, int msw_batch_size) {
#define _alloc_msw_num(data, num, data_type) ((data) = calloc(num, sizeof(data_type)))
#define _alloc_msw(data, data_type) ((data) = calloc(n_threads, sizeof(data_type)))
int i = 0, j = 0;
_alloc_msw(msw->t_msw_tasks, msw_task_v*);
for (i = 0; i < n_threads; ++i) {
_alloc_msw_num(msw->t_msw_tasks[i], 2, msw_task_v);
}
_alloc_msw(msw->t_msw_tasks_u8, msw_task_v);
_alloc_msw(msw->t_msw_tasks_i16, msw_task_v);
_alloc_msw(msw->t_msw_buf, msw_buf_t);
_alloc_msw(msw->t_msw_stats, msw_stats_t);
_alloc_msw(msw->t_msw_ref_buf, byte_vv);
@ -23,7 +28,6 @@ void init_msw_data(msw_data_t* msw, int n_threads, int msw_batch_size) {
_alloc_msw(msw->t_msw_2_kswrs, msw_kswr_v);
// 初始化缓存空间
int i = 0, j = 0;
msw_ref_t init_ref = {0};
msw_seq_t init_seq = {0};
kswr_avx_t init_kswr = {0};

View File

@ -34,8 +34,8 @@ typedef kvec_t(msw_task_t*) msw_task_ptr_v;
// 每个seq保留task所在的线程和数组idx
typedef struct {
int arr_idx; // 是u8还是i16
int thread_idx; // 在哪个线程
int arr_idx; // 是u8还是i16
int task_idx; // 数组内的索引
} msw_seq_task_t;
@ -73,9 +73,8 @@ typedef kvec_t(kswr_avx_t) msw_kswr_v;
// mate sw相关的所有数据
typedef struct {
msw_task_v* t_msw_tasks_u8; // 线程内收集需要做mate sw的任务的数据
msw_task_v* t_msw_tasks_i16;
msw_task_ptr_v p_msw_tasks_u8; // 线程共享的mate sw task指针
msw_task_v** t_msw_tasks; // 0是u8, 1是i16线程内收集需要做mate sw的任务的数据
msw_task_ptr_v p_msw_tasks_u8; // 线程共享的mate sw task指针
msw_task_ptr_v p_msw_tasks_i16;
msw_buf_t* t_msw_buf; // 每个线程一个mate sw 缓存
msw_stats_t* t_msw_stats; // 每个线程一个,统计信息

View File

@ -77,7 +77,7 @@ static inline void revseq(int l, uint8_t* s) {
// 计算当前seq是否需要做matesw需要的话保存必要的数据
static void get_matesw_tasks(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_pestat_t pes[4], const mem_alnreg_t* a, int a_j,
bseq1_t* seq, mem_alnreg_v* ma, int64_t sid, msw_stats_t* stats, msw_task_v* msw8, msw_task_v* msw16, int64_t n_processed, int tid) {
bseq1_t* seq, mem_alnreg_v* ma, int64_t sid, msw_stats_t* stats, msw_task_v* msw_tasks, int64_t n_processed, int tid) {
int64_t l_pac = bns->l_pac;
int l_ms = seq->l_seq;
int i, r, skip[4], rid;
@ -119,10 +119,11 @@ static void get_matesw_tasks(const mem_opt_t* opt, const bntseq_t* bns, const ui
// fprintf(stderr, "zzh here\n");
int xtra = KSW_XSUBO | KSW_XSTART | (l_ms * opt->a < 250 ? KSW_XBYTE : 0) | (opt->min_seed_len * opt->a);
msw_task_t* p;
if ((xtra & KSW_XBYTE))
p = kv_pushp(msw_task_t, *msw8);
else
p = kv_pushp(msw_task_t, *msw16);
int msw_idx = 0; // u8
if (!(xtra & KSW_XBYTE))
msw_idx = 1; // i16
msw_task_v* task_arr = &msw_tasks[msw_idx];
p = kv_pushp(msw_task_t, *task_arr);
p->is_rev = is_rev;
p->skip = skip[0] | skip[1] << 1 | skip[2] << 2 | skip[3] << 3;
p->xtra = xtra;
@ -132,10 +133,9 @@ static void get_matesw_tasks(const mem_opt_t* opt, const bntseq_t* bns, const ui
p->aj = a_j;
p->to = task_order++; // 有啥用? 应该是用来合并u8和i16的时候排序用
msw_seq_task_t seq_task = {0, tid, msw8->n - 1};
msw_seq_task_t seq_task = {tid, msw_idx, task_arr->n - 1};
kv_push(msw_seq_task_t, seq->msw_task, seq_task);
// kv_push(msw_task_t*, seq->msw, p); // 将matesw任务和对应的seq关联起来这里放指针是不行的因为指针位置会变要保存offset才行
// kv_push(int, seq->msw, msw8->n - 1); // 这里需要考虑i16, 本线程的msw的offset
// 将matesw任务和对应的seq关联起来这里放指针是不行的因为指针位置会变要保存offset才行
}
}
}
@ -155,6 +155,7 @@ static void worker_matesw_tasks(void* data, int idx, int tid) {
kv_init(aj[1]);
kv_init(b[0]);
kv_init(b[1]);
for (id = startIdx; id < endIdx; id += 2) {
int64_t i_s = id;
bseq1_t* s = &w->seqs[i_s];
@ -176,7 +177,7 @@ static void worker_matesw_tasks(void* data, int idx, int tid) {
for (i = 0; i < 2; ++i)
for (j = 0; j < b[i].n && j < opt->max_matesw; ++j)
get_matesw_tasks(opt, w->bns, w->pac, w->pes, &b[i].a[j], aj[i].a[j], &s[!i], &a[!i], i_s + !i, &w->msw->t_msw_stats[tid],
&w->msw->t_msw_tasks_u8[tid], &w->msw->t_msw_tasks_i16[tid], w->n_processed, tid);
w->msw->t_msw_tasks[tid], w->n_processed, tid);
}
free(b[0].a);
free(b[1].a);
@ -559,9 +560,8 @@ no_pairing:
free(h[1].cigar);
end_clear:
//_destory_clear_vec(s[0].msw);
//_destory_clear_vec(s[1].msw);
// 清理seq相关的内存空间
_destory_clear_vec(s[0].msw_task);
_destory_clear_vec(s[1].msw_task);
@ -612,14 +612,13 @@ static void workder_gen_sam(void* data, int idx, int tid) {
#ifdef SAM_EXACT
_clear_vec(bb[0]);
_clear_vec(bb[1]);
for (i = 0; i < 2; ++i)
//for (j = 0; j < s[!i].msw.n; ++j) {
// msw_task_t* t = s[!i].msw.a[j];
for (i = 0; i < 2; ++i) {
for (k = 0; k < s[!i].msw_task.n; ++k) {
msw_seq_task_t st = s[!i].msw_task.a[k];
msw_task_t* t = &w->msw->t_msw_tasks_u8[st.thread_idx].a[st.task_idx];
msw_task_t* t = &w->msw->t_msw_tasks[st.thread_idx][st.arr_idx].a[st.task_idx];
kv_push(mem_alnreg_t, bb[i], a[i].a[t->aj]);
}
}
#endif
for (i = 0; i < 2; ++i) {
@ -627,12 +626,10 @@ static void workder_gen_sam(void* data, int idx, int tid) {
#ifndef SAM_EXACT
int origin_n = a[si].n;
#endif
// 这里应该先给task排序因为u8和i16是分开排序的需要合在一起
//for (k = 0; k < s[si].msw.n; ++k) {
// msw_task_t* t = s[si].msw.a[k];
// 这里应该先给task排序因为u8和i16是分开排序的需要合在一起不用了添加task的时候已经排序过了
for (k = 0; k < s[si].msw_task.n; ++k) {
msw_seq_task_t st = s[si].msw_task.a[k];
msw_task_t* t = &w->msw->t_msw_tasks_u8[st.thread_idx].a[st.task_idx];
msw_task_t* t = &w->msw->t_msw_tasks[st.thread_idx][st.arr_idx].a[st.task_idx];
#ifdef SAM_EXACT
mem_alnreg_t* b = &bb[i].a[k];
#else
@ -691,14 +688,18 @@ static void workder_gen_sam(void* data, int idx, int tid) {
}
// 划分matesw任务
static void gather_matesw_task(mem_worker_t* w, msw_task_v* thread_tasks, msw_task_ptr_v* tasks) {
_clear_vec(*tasks);
static void gather_matesw_task(mem_worker_t* w, msw_task_v** thread_tasks) {
_clear_vec(w->msw->p_msw_tasks_u8);
_clear_vec(w->msw->p_msw_tasks_i16);
int i = 0, j = 0;
for (i = 0; i < w->opt->n_threads; ++i) {
for (j = 0; j < thread_tasks[i].n; ++j) {
msw_task_t* tp = &thread_tasks[i].a[j];
kv_push(msw_task_t*, *tasks, tp);
// kv_push(msw_task_t*, w->seqs[tp->seq_id].msw, tp);
for (j = 0; j < thread_tasks[i][0].n; ++j) {
msw_task_t* tp = &thread_tasks[i][0].a[j];
kv_push(msw_task_t*, w->msw->p_msw_tasks_u8, tp);
}
for (j = 0; j < thread_tasks[i][1].n; ++j) {
msw_task_t* tp = &thread_tasks[i][1].a[j];
kv_push(msw_task_t*, w->msw->p_msw_tasks_i16, tp);
}
}
}
@ -749,14 +750,15 @@ static void alloc_update_cache_avx512(mem_worker_t* w) {
// 针对pair end数据生成sam的过程
void gen_paired_sam(mem_worker_t* w) {
int i = 0;
for (i = 0; i < w->opt->n_threads; ++i) {
w->msw->t_msw_tasks_u8[i].n = 0; // 清空之前的数据
w->msw->t_msw_tasks_i16[i].n = 0;
}
if (w->opt->flag & MEM_F_NO_RESCUE) {
kt_for(w->opt->n_threads, worker_sam, w, w->n_reads >> 1); // generate alignment
} else {
// 清空一下任务数组,不能放到线程里去做
int i = 0;
for (i = 0; i < w->opt->n_threads; ++i) {
_clear_vec(w->msw->t_msw_tasks[i][0]);
_clear_vec(w->msw->t_msw_tasks[i][1]);
}
int batch_n = (w->n_reads + w->opt->batch_size - 1) / w->opt->batch_size;
// 1. 计算哪些read需要matesw
@ -773,17 +775,20 @@ void gen_paired_sam(mem_worker_t* w) {
// 2. 收集每个线程中的msw任务
PROF_START(gather_matesw_task);
gather_matesw_task(w, w->msw->t_msw_tasks_u8, &w->msw->p_msw_tasks_u8);
gather_matesw_task(w, w->msw->t_msw_tasks_i16, &w->msw->p_msw_tasks_i16);
gather_matesw_task(w, w->msw->t_msw_tasks);
PROF_END(gprof[G_gather_matesw_task], gather_matesw_task);
PROF_START(calc_matesw);
int msw_batch_n = (w->msw->p_msw_tasks_u8.n + w->opt->msw_batch_size - 1) / w->opt->msw_batch_size;
int msw_batch_n = 0;
// 3. 处理msw任务
if (w->msw->p_msw_tasks_u8.n > 0)
if (w->msw->p_msw_tasks_u8.n > 0) {
msw_batch_n = (w->msw->p_msw_tasks_u8.n + w->opt->msw_batch_size - 1) / w->opt->msw_batch_size;
kt_for(w->opt->n_threads, worker_calc_matesw_avx512_u8, w, msw_batch_n);
if (w->msw->p_msw_tasks_i16.n > 0)
}
if (w->msw->p_msw_tasks_i16.n > 0) {
msw_batch_n = (w->msw->p_msw_tasks_i16.n + w->opt->msw_batch_size - 1) / w->opt->msw_batch_size;
kt_for(w->opt->n_threads, worker_calc_matesw_avx512_i16, w, msw_batch_n);
}
PROF_END(gprof[G_calc_matesw], calc_matesw);
// 4. 生成sam
@ -791,5 +796,5 @@ void gen_paired_sam(mem_worker_t* w) {
kt_for(w->opt->n_threads, workder_gen_sam, w, batch_n);
PROF_END(gprof[G_gen_sam], gen_sam);
}
fprintf(stderr, "zzh %d : 8: %ld 16: %ld\n", i, w->msw->p_msw_tasks_u8.n, w->msw->p_msw_tasks_i16.n);
fprintf(stderr, "zzh : u8: %ld i16: %ld\n", w->msw->p_msw_tasks_u8.n, w->msw->p_msw_tasks_i16.n);
}

14
run.sh
View File

@ -1,11 +1,11 @@
thread=64
thread=1
make clean; make -j 32
n1=~/data/dataset/D2/n1.fq.gz
n2=~/data/dataset/D2/n2.fq.gz
#n1=~/data/dataset/D2/d1.fq
#n2=~/data/dataset/D2/d2.fq
#n1=~/data/dataset/D2/n1.fq.gz
#n2=~/data/dataset/D2/n2.fq.gz
n1=~/data/dataset/D2/d1.fq
n2=~/data/dataset/D2/d2.fq
#n1=~/data/SRR25735656_1.fastq.gz
#n2=~/data/SRR25735656_2.fastq.gz
@ -17,8 +17,8 @@ reference=~/data/reference/fmt/human_g1k_v37_decoy.fasta
#out=/dev/null
#out=./oldsam-D2-out.sam
out=./D2-1.sam
#out=./d.sam
#out=./D2-1.sam
out=./d.sam
prog=./fastalign
#prog=/home/zzh/fastbwa