diff --git a/mate_sw.c b/mate_sw.c index 285c9e7..6a1891a 100644 --- a/mate_sw.c +++ b/mate_sw.c @@ -3,10 +3,15 @@ // 初始化mate sw计算相关的数据 void init_msw_data(msw_data_t* msw, int n_threads, int msw_batch_size) { +#define _alloc_msw_num(data, num, data_type) ((data) = calloc(num, sizeof(data_type))) #define _alloc_msw(data, data_type) ((data) = calloc(n_threads, sizeof(data_type))) + int i = 0, j = 0; + + _alloc_msw(msw->t_msw_tasks, msw_task_v*); + for (i = 0; i < n_threads; ++i) { + _alloc_msw_num(msw->t_msw_tasks[i], 2, msw_task_v); + } - _alloc_msw(msw->t_msw_tasks_u8, msw_task_v); - _alloc_msw(msw->t_msw_tasks_i16, msw_task_v); _alloc_msw(msw->t_msw_buf, msw_buf_t); _alloc_msw(msw->t_msw_stats, msw_stats_t); _alloc_msw(msw->t_msw_ref_buf, byte_vv); @@ -23,7 +28,6 @@ void init_msw_data(msw_data_t* msw, int n_threads, int msw_batch_size) { _alloc_msw(msw->t_msw_2_kswrs, msw_kswr_v); // 初始化缓存空间 - int i = 0, j = 0; msw_ref_t init_ref = {0}; msw_seq_t init_seq = {0}; kswr_avx_t init_kswr = {0}; diff --git a/mate_sw.h b/mate_sw.h index 30a4a42..47932c5 100644 --- a/mate_sw.h +++ b/mate_sw.h @@ -34,8 +34,8 @@ typedef kvec_t(msw_task_t*) msw_task_ptr_v; // 每个seq保留task所在的线程,和数组idx typedef struct { - int arr_idx; // 是u8还是i16 int thread_idx; // 在哪个线程 + int arr_idx; // 是u8还是i16 int task_idx; // 数组内的索引 } msw_seq_task_t; @@ -73,9 +73,8 @@ typedef kvec_t(kswr_avx_t) msw_kswr_v; // mate sw相关的所有数据 typedef struct { - msw_task_v* t_msw_tasks_u8; // 线程内收集需要做mate sw的任务的数据 - msw_task_v* t_msw_tasks_i16; - msw_task_ptr_v p_msw_tasks_u8; // 线程共享的mate sw task指针 + msw_task_v** t_msw_tasks; // 0是u8, 1是i16,线程内收集需要做mate sw的任务的数据 + msw_task_ptr_v p_msw_tasks_u8; // 线程共享的mate sw task指针 msw_task_ptr_v p_msw_tasks_i16; msw_buf_t* t_msw_buf; // 每个线程一个,mate sw 缓存 msw_stats_t* t_msw_stats; // 每个线程一个,统计信息 diff --git a/paired_sam.c b/paired_sam.c index 9464733..60ac32d 100644 --- a/paired_sam.c +++ b/paired_sam.c @@ -77,7 +77,7 @@ static inline void revseq(int l, uint8_t* s) { // 计算当前seq是否需要做matesw,需要的话保存必要的数据 static void get_matesw_tasks(const mem_opt_t* opt, const bntseq_t* bns, const uint8_t* pac, const mem_pestat_t pes[4], const mem_alnreg_t* a, int a_j, - bseq1_t* seq, mem_alnreg_v* ma, int64_t sid, msw_stats_t* stats, msw_task_v* msw8, msw_task_v* msw16, int64_t n_processed, int tid) { + bseq1_t* seq, mem_alnreg_v* ma, int64_t sid, msw_stats_t* stats, msw_task_v* msw_tasks, int64_t n_processed, int tid) { int64_t l_pac = bns->l_pac; int l_ms = seq->l_seq; int i, r, skip[4], rid; @@ -119,10 +119,11 @@ static void get_matesw_tasks(const mem_opt_t* opt, const bntseq_t* bns, const ui // fprintf(stderr, "zzh here\n"); int xtra = KSW_XSUBO | KSW_XSTART | (l_ms * opt->a < 250 ? KSW_XBYTE : 0) | (opt->min_seed_len * opt->a); msw_task_t* p; - if ((xtra & KSW_XBYTE)) - p = kv_pushp(msw_task_t, *msw8); - else - p = kv_pushp(msw_task_t, *msw16); + int msw_idx = 0; // u8 + if (!(xtra & KSW_XBYTE)) + msw_idx = 1; // i16 + msw_task_v* task_arr = &msw_tasks[msw_idx]; + p = kv_pushp(msw_task_t, *task_arr); p->is_rev = is_rev; p->skip = skip[0] | skip[1] << 1 | skip[2] << 2 | skip[3] << 3; p->xtra = xtra; @@ -132,10 +133,9 @@ static void get_matesw_tasks(const mem_opt_t* opt, const bntseq_t* bns, const ui p->aj = a_j; p->to = task_order++; // 有啥用? 应该是用来合并u8和i16的时候排序用 - msw_seq_task_t seq_task = {0, tid, msw8->n - 1}; + msw_seq_task_t seq_task = {tid, msw_idx, task_arr->n - 1}; kv_push(msw_seq_task_t, seq->msw_task, seq_task); - // kv_push(msw_task_t*, seq->msw, p); // 将matesw任务和对应的seq关联起来,这里放指针是不行的,因为指针位置会变,要保存offset才行 - // kv_push(int, seq->msw, msw8->n - 1); // 这里需要考虑i16, 本线程的msw的offset + // 将matesw任务和对应的seq关联起来,这里放指针是不行的,因为指针位置会变,要保存offset才行 } } } @@ -155,6 +155,7 @@ static void worker_matesw_tasks(void* data, int idx, int tid) { kv_init(aj[1]); kv_init(b[0]); kv_init(b[1]); + for (id = startIdx; id < endIdx; id += 2) { int64_t i_s = id; bseq1_t* s = &w->seqs[i_s]; @@ -176,7 +177,7 @@ static void worker_matesw_tasks(void* data, int idx, int tid) { for (i = 0; i < 2; ++i) for (j = 0; j < b[i].n && j < opt->max_matesw; ++j) get_matesw_tasks(opt, w->bns, w->pac, w->pes, &b[i].a[j], aj[i].a[j], &s[!i], &a[!i], i_s + !i, &w->msw->t_msw_stats[tid], - &w->msw->t_msw_tasks_u8[tid], &w->msw->t_msw_tasks_i16[tid], w->n_processed, tid); + w->msw->t_msw_tasks[tid], w->n_processed, tid); } free(b[0].a); free(b[1].a); @@ -559,9 +560,8 @@ no_pairing: free(h[1].cigar); end_clear: - //_destory_clear_vec(s[0].msw); - //_destory_clear_vec(s[1].msw); +// 清理seq相关的内存空间 _destory_clear_vec(s[0].msw_task); _destory_clear_vec(s[1].msw_task); @@ -612,14 +612,13 @@ static void workder_gen_sam(void* data, int idx, int tid) { #ifdef SAM_EXACT _clear_vec(bb[0]); _clear_vec(bb[1]); - for (i = 0; i < 2; ++i) - //for (j = 0; j < s[!i].msw.n; ++j) { - // msw_task_t* t = s[!i].msw.a[j]; + for (i = 0; i < 2; ++i) { for (k = 0; k < s[!i].msw_task.n; ++k) { msw_seq_task_t st = s[!i].msw_task.a[k]; - msw_task_t* t = &w->msw->t_msw_tasks_u8[st.thread_idx].a[st.task_idx]; + msw_task_t* t = &w->msw->t_msw_tasks[st.thread_idx][st.arr_idx].a[st.task_idx]; kv_push(mem_alnreg_t, bb[i], a[i].a[t->aj]); } + } #endif for (i = 0; i < 2; ++i) { @@ -627,12 +626,10 @@ static void workder_gen_sam(void* data, int idx, int tid) { #ifndef SAM_EXACT int origin_n = a[si].n; #endif - // 这里应该先给task排序,因为u8和i16是分开排序的,需要合在一起 - //for (k = 0; k < s[si].msw.n; ++k) { - // msw_task_t* t = s[si].msw.a[k]; + // 这里应该先给task排序,因为u8和i16是分开排序的,需要合在一起,不用了,添加task的时候已经排序过了 for (k = 0; k < s[si].msw_task.n; ++k) { msw_seq_task_t st = s[si].msw_task.a[k]; - msw_task_t* t = &w->msw->t_msw_tasks_u8[st.thread_idx].a[st.task_idx]; + msw_task_t* t = &w->msw->t_msw_tasks[st.thread_idx][st.arr_idx].a[st.task_idx]; #ifdef SAM_EXACT mem_alnreg_t* b = &bb[i].a[k]; #else @@ -691,14 +688,18 @@ static void workder_gen_sam(void* data, int idx, int tid) { } // 划分matesw任务 -static void gather_matesw_task(mem_worker_t* w, msw_task_v* thread_tasks, msw_task_ptr_v* tasks) { - _clear_vec(*tasks); +static void gather_matesw_task(mem_worker_t* w, msw_task_v** thread_tasks) { + _clear_vec(w->msw->p_msw_tasks_u8); + _clear_vec(w->msw->p_msw_tasks_i16); int i = 0, j = 0; for (i = 0; i < w->opt->n_threads; ++i) { - for (j = 0; j < thread_tasks[i].n; ++j) { - msw_task_t* tp = &thread_tasks[i].a[j]; - kv_push(msw_task_t*, *tasks, tp); - // kv_push(msw_task_t*, w->seqs[tp->seq_id].msw, tp); + for (j = 0; j < thread_tasks[i][0].n; ++j) { + msw_task_t* tp = &thread_tasks[i][0].a[j]; + kv_push(msw_task_t*, w->msw->p_msw_tasks_u8, tp); + } + for (j = 0; j < thread_tasks[i][1].n; ++j) { + msw_task_t* tp = &thread_tasks[i][1].a[j]; + kv_push(msw_task_t*, w->msw->p_msw_tasks_i16, tp); } } } @@ -749,14 +750,15 @@ static void alloc_update_cache_avx512(mem_worker_t* w) { // 针对pair end数据,生成sam的过程 void gen_paired_sam(mem_worker_t* w) { - int i = 0; - for (i = 0; i < w->opt->n_threads; ++i) { - w->msw->t_msw_tasks_u8[i].n = 0; // 清空之前的数据 - w->msw->t_msw_tasks_i16[i].n = 0; - } if (w->opt->flag & MEM_F_NO_RESCUE) { kt_for(w->opt->n_threads, worker_sam, w, w->n_reads >> 1); // generate alignment } else { + // 清空一下任务数组,不能放到线程里去做 + int i = 0; + for (i = 0; i < w->opt->n_threads; ++i) { + _clear_vec(w->msw->t_msw_tasks[i][0]); + _clear_vec(w->msw->t_msw_tasks[i][1]); + } int batch_n = (w->n_reads + w->opt->batch_size - 1) / w->opt->batch_size; // 1. 计算哪些read需要matesw @@ -773,17 +775,20 @@ void gen_paired_sam(mem_worker_t* w) { // 2. 收集每个线程中的msw任务 PROF_START(gather_matesw_task); - gather_matesw_task(w, w->msw->t_msw_tasks_u8, &w->msw->p_msw_tasks_u8); - gather_matesw_task(w, w->msw->t_msw_tasks_i16, &w->msw->p_msw_tasks_i16); + gather_matesw_task(w, w->msw->t_msw_tasks); PROF_END(gprof[G_gather_matesw_task], gather_matesw_task); PROF_START(calc_matesw); - int msw_batch_n = (w->msw->p_msw_tasks_u8.n + w->opt->msw_batch_size - 1) / w->opt->msw_batch_size; + int msw_batch_n = 0; // 3. 处理msw任务 - if (w->msw->p_msw_tasks_u8.n > 0) + if (w->msw->p_msw_tasks_u8.n > 0) { + msw_batch_n = (w->msw->p_msw_tasks_u8.n + w->opt->msw_batch_size - 1) / w->opt->msw_batch_size; kt_for(w->opt->n_threads, worker_calc_matesw_avx512_u8, w, msw_batch_n); - if (w->msw->p_msw_tasks_i16.n > 0) + } + if (w->msw->p_msw_tasks_i16.n > 0) { + msw_batch_n = (w->msw->p_msw_tasks_i16.n + w->opt->msw_batch_size - 1) / w->opt->msw_batch_size; kt_for(w->opt->n_threads, worker_calc_matesw_avx512_i16, w, msw_batch_n); + } PROF_END(gprof[G_calc_matesw], calc_matesw); // 4. 生成sam @@ -791,5 +796,5 @@ void gen_paired_sam(mem_worker_t* w) { kt_for(w->opt->n_threads, workder_gen_sam, w, batch_n); PROF_END(gprof[G_gen_sam], gen_sam); } - fprintf(stderr, "zzh %d : 8: %ld 16: %ld\n", i, w->msw->p_msw_tasks_u8.n, w->msw->p_msw_tasks_i16.n); + fprintf(stderr, "zzh : u8: %ld i16: %ld\n", w->msw->p_msw_tasks_u8.n, w->msw->p_msw_tasks_i16.n); } \ No newline at end of file diff --git a/run.sh b/run.sh index fd7d398..eba86de 100755 --- a/run.sh +++ b/run.sh @@ -1,11 +1,11 @@ -thread=64 +thread=1 make clean; make -j 32 -n1=~/data/dataset/D2/n1.fq.gz -n2=~/data/dataset/D2/n2.fq.gz -#n1=~/data/dataset/D2/d1.fq -#n2=~/data/dataset/D2/d2.fq +#n1=~/data/dataset/D2/n1.fq.gz +#n2=~/data/dataset/D2/n2.fq.gz +n1=~/data/dataset/D2/d1.fq +n2=~/data/dataset/D2/d2.fq #n1=~/data/SRR25735656_1.fastq.gz #n2=~/data/SRR25735656_2.fastq.gz @@ -17,8 +17,8 @@ reference=~/data/reference/fmt/human_g1k_v37_decoy.fasta #out=/dev/null #out=./oldsam-D2-out.sam -out=./D2-1.sam -#out=./d.sam +#out=./D2-1.sam +out=./d.sam prog=./fastalign #prog=/home/zzh/fastbwa