From 1e3965cb7d07a3605c753633721ba9c159f9c700 Mon Sep 17 00:00:00 2001 From: zzh Date: Sun, 24 Mar 2024 04:40:09 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E4=BA=86=E5=8F=AF=E5=90=8C?= =?UTF-8?q?=E6=97=B6=E8=AF=BB=E5=86=99=E7=9A=84pipeline=EF=BC=8C=E4=BC=98?= =?UTF-8?q?=E5=8C=96=E4=BA=86=E6=97=B6=E9=97=B4=E7=BB=9F=E8=AE=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .vscode/launch.json | 4 +- .vscode/settings.json | 8 +- Makefile | 14 +- bwa.c | 66 +++++-- bwa.h | 4 + bwamem.c | 254 ++++++++++++++++++------ bwamem.h | 16 +- bwamem_pair.c | 16 +- debug.sh | 6 +- fastmap.c | 446 ++++++++++++++++++++++++++++++++++++------ ksw_extend2_avx2.c | 34 ++-- run.sh | 12 +- utils.c | 4 +- utils.h | 55 +++--- yarn.c | 398 +++++++++++++++++++++++++++++++++++++ yarn.h | 139 +++++++++++++ 16 files changed, 1281 insertions(+), 195 deletions(-) create mode 100644 yarn.c create mode 100644 yarn.h diff --git a/.vscode/launch.json b/.vscode/launch.json index e19ece5..6bff252 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -18,8 +18,8 @@ "-R", "'@RG\\tID:normal\\tSM:normal\\tPL:illumina\\tLB:normal\\tPG:bwa'", "~/reference/human_g1k_v37_decoy.fasta", - "~/data/fastq/ds/d1_1.fq", - "~/data/fastq/ds/d1_2.fq", + "~/data/fastq/dataset/na12878_wes_144/SRR25735653_1.fastq", + "~/data/fastq/dataset/na12878_wes_144/SRR25735653_2.fastq", "-o", "/dev/null" ], diff --git a/.vscode/settings.json b/.vscode/settings.json index 5d14020..9fbfc77 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -10,6 +10,12 @@ "limits": "c", "bit": "c", "numeric": "c", - "typeinfo": "c" + "typeinfo": "c", + "yarn.h": "c", + "malloc_wrap.h": "c", + "emmintrin.h": "c", + "bwamem.h": "c", + "utils.h": "c", + "stdio.h": "c" } } \ No newline at end of file diff --git a/Makefile b/Makefile index 0b7ae7c..4ed8d89 100644 --- a/Makefile +++ b/Makefile @@ -3,12 +3,18 @@ CC= gcc # CFLAGS= -g -Wall -Wno-unused-function -O2 CFLAGS= -g -Wall -Wno-unused-function -mavx2 -O2 WRAP_MALLOC=-DUSE_MALLOC_WRAPPERS -SHOW_PERF= #-DSHOW_PERF -FILTER_FULL_MATCH=#-DFILTER_FULL_MATCH + +SHOW_PERF= -DSHOW_PERF +SHOW_DATA_PERF= #-DSHOW_DATA_PERF +DEBUG_OUTPUT= #-DDEBUG_OUTPUT +DEBUG_SW_EXTEND= #-DDEBUG_SW_EXTEND +FILTER_FULL_MATCH= #-DFILTER_FULL_MATCH +USE_MT_READ= #-DUSE_MT_READ + AR= ar -DFLAGS= -DHAVE_PTHREAD $(WRAP_MALLOC) $(SHOW_PERF) $(FILTER_FULL_MATCH) -DUSE_AVX2 -DKSW_EQUAL -DUSE_MT_READ +DFLAGS= -DHAVE_PTHREAD $(WRAP_MALLOC) $(SHOW_PERF) $(SHOW_DATA_PERF) $(DEBUG_OUTPUT) $(DEBUG_SW_EXTEND) $(FILTER_FULL_MATCH) $(USE_MT_READ) -DUSE_AVX2 -DKSW_EQUAL LOBJS= utils.o kthread.o kstring.o ksw.o bwt.o bntseq.o bwa.o bwamem.o bwamem_pair.o bwamem_extra.o malloc_wrap.o \ - QSufSort.o bwt_gen.o rope.o rle.o is.o bwtindex.o + QSufSort.o bwt_gen.o rope.o rle.o is.o bwtindex.o yarn.o AOBJS= bwashm.o bwase.o bwaseqio.o bwtgap.o bwtaln.o bamlite.o \ bwape.o kopen.o pemerge.o maxk.o \ bwtsw2_core.o bwtsw2_main.o bwtsw2_aux.o bwt_lite.o \ diff --git a/bwa.c b/bwa.c index a337995..4c43045 100644 --- a/bwa.c +++ b/bwa.c @@ -118,10 +118,10 @@ static void *thread_bseq_read(void *data) { return 0; } -#define READ_ONE_SEQ(ks) \ - trim_readno(&ks->name); \ - kseq2bseq1(ks, &seqs[n], copy_comment); \ - seqs[n].id = n; \ +#define READ_ONE_SEQ(ksin) \ + trim_readno(&(ksin)->name); \ + kseq2bseq1(ksin, &seqs[n], copy_comment); \ + seqs[n].id = n; \ size += seqs[n++].l_seq; // multi thread reading input seqs @@ -132,11 +132,13 @@ void bseq_read_pe_mt(int chunk_size, int *n_, void *ks1_, void *ks2_, int copy_c bseq1_t *seqs = *seqs_ptr; read_data_t d[2]; pthread_t tid[2]; - + const int chunk_size_narrow = 4 * 1024 * 1024; + const int init_n_reads = 20; if (m == 0) { // 还没开辟空间,要初始化 - seqs = calloc(20, sizeof(bseq1_t)); // 先读取20个reads,根据reads的长度和chunk size决定要读取多少条reads + seqs = calloc(init_n_reads, sizeof(bseq1_t)); // 先读取20个reads,根据reads的长度和chunk size决定要读取多少条reads +#if 1 int ks1_ret = 0, ks2_ret = 0; - int i = 10; + int i = init_n_reads >> 1; while (i-- > 0) { ks1_ret = kseq_read(ks); if (ks1_ret < 0) break; @@ -155,31 +157,60 @@ void bseq_read_pe_mt(int chunk_size, int *n_, void *ks1_, void *ks2_, int copy_c *n_ = n; *seqs_ptr = seqs; return; } - // 重新开辟空间 - m = (chunk_size + size / 10 - 1) / (size / 10) * 2; - *m_ = m; + m = (chunk_size + size / init_n_reads - 1) / (size / init_n_reads); +#else + m = 50000; +#endif seqs = realloc(seqs, m * sizeof(bseq1_t)); - memset(seqs + n, 0, sizeof(bseq1_t)); - *seqs_ptr = seqs; + memset(seqs + n, 0, sizeof(bseq1_t) * (m - n)); } d[0].copy_comment = copy_comment; d[1].copy_comment = copy_comment; d[0].ks = ks ; d[0].seq = &seqs[0]; d[0].n_bound = (m >> 1) - (n>>1); d[0].start_pos = n; d[1].ks = ks2; d[1].seq = &seqs[0]; d[1].n_bound = (m >> 1) - (n>>1); d[1].start_pos = n+1; - d[0].chunk_size = d[1].chunk_size = (chunk_size + 1) >> 1; + d[0].chunk_size = d[1].chunk_size = (chunk_size - chunk_size_narrow - size) >> 1; pthread_create(&tid[0], 0, thread_bseq_read, &d[0]); pthread_create(&tid[1], 0, thread_bseq_read, &d[1]); pthread_join(tid[0], 0); pthread_join(tid[1], 0); size += d[0].ret_size + d[1].ret_size; + + // 如果两个线程读入的reads数量不一致 + if (d[0].ret_n < d[1].ret_n) + { + int num_to_read = d[1].ret_n - d[0].ret_n; + int offset = n + d[0].ret_n * 2; + while (num_to_read-- > 0 && kseq_read(ks) >= 0) { + trim_readno(&ks->name); + kseq2bseq1(ks, &seqs[offset], copy_comment); + seqs[offset].id = offset; + size += seqs[offset].l_seq; + offset += 2; + } + d[0].ret_n = d[1].ret_n; + } + else if (d[1].ret_n < d[0].ret_n) + { + int num_to_read = d[0].ret_n - d[1].ret_n; + int offset = n + 1 + d[1].ret_n * 2; + while (num_to_read-- > 0 && kseq_read(ks2) >= 0) { + trim_readno(&ks2->name); + kseq2bseq1(ks2, &seqs[offset], copy_comment); + seqs[offset].id = offset; + size += seqs[offset].l_seq; + offset += 2; + } + d[1].ret_n = d[0].ret_n; + } + n += d[0].ret_n + d[1].ret_n; if (size == 0 && kseq_read(ks2) >= 0) { // test if the 2nd file is finished fprintf(stderr, "[W::%s] the 1st file has fewer sequences.\n", __func__); } else if (size < chunk_size && d[0].ret_status > 0 && d[1].ret_status > 0) { while (kseq_read(ks) >= 0) { - if (ks2 && kseq_read(ks2) < 0) { // the 2nd file has fewer reads + if (kseq_read(ks2) < 0) { // the 2nd file has fewer reads fprintf(stderr, "[W::%s] the 2nd file has fewer sequences.\n", __func__); break; } @@ -189,16 +220,13 @@ void bseq_read_pe_mt(int chunk_size, int *n_, void *ks1_, void *ks2_, int copy_c memset(seqs + n, 0, (m-n) * sizeof(bseq1_t)); } READ_ONE_SEQ(ks); - if (ks2) { - READ_ONE_SEQ(ks2); - } + READ_ONE_SEQ(ks2); if (size >= chunk_size && (n&1) == 0) break; } if (size == 0) { // test if the 2nd file is finished - if (ks2 && kseq_read(ks2) >= 0) fprintf(stderr, "[W::%s] the 1st file has fewer sequences.\n", __func__); + if (kseq_read(ks2) >= 0) fprintf(stderr, "[W::%s] the 1st file has fewer sequences.\n", __func__); } } - *n_ = n; *size_ = size; if (m > *m_) *m_ = m; *seqs_ptr = seqs; diff --git a/bwa.h b/bwa.h index 5e125a8..9cf7f78 100644 --- a/bwa.h +++ b/bwa.h @@ -65,6 +65,10 @@ typedef struct { kstring_t sam; } bseq1_t; +typedef struct { + kstring_t sam; +} seq_sam_t; + extern int bwa_verbose, bwa_dbg; extern char bwa_rg_id[256]; diff --git a/bwamem.c b/bwamem.c index 7143efb..2bebd93 100644 --- a/bwamem.c +++ b/bwamem.c @@ -30,6 +30,7 @@ #include #include #include +#include #ifdef HAVE_PTHREAD #include #endif @@ -127,7 +128,12 @@ static void mem_collect_intv(const mem_opt_t *opt, const FMTIndex *fmt, int len, a->mem.n = 0; #ifdef SHOW_PERF - int64_t tmp_time = realtime_msec(); + uint64_t tmp_time1, tmp_time2; +#if USE_RDTSC + tmp_time1 = __rdtsc(); +#else + tmp_time1 = realtime_msec(); +#endif #endif // 1. first pass: find all SMEMs while (x < len) { @@ -149,8 +155,13 @@ static void mem_collect_intv(const mem_opt_t *opt, const FMTIndex *fmt, int len, } } #ifdef SHOW_PERF - a->time_seed_1 += realtime_msec() - tmp_time; - tmp_time = realtime_msec(); +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + a->time_seed_1 += tmp_time2 - tmp_time1; + tmp_time1 = tmp_time2; #endif #ifdef FILTER_FULL_MATCH @@ -173,9 +184,15 @@ static void mem_collect_intv(const mem_opt_t *opt, const FMTIndex *fmt, int len, } } #ifdef SHOW_PERF - a->time_seed_2 = realtime_msec() - tmp_time; - tmp_time = realtime_msec(); +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); #endif + a->time_seed_2 += tmp_time2 - tmp_time1; + tmp_time1 = tmp_time2; +#endif + // 3. third pass: LAST-like if (opt->max_mem_intv > 0) { x = 0; @@ -190,7 +207,12 @@ static void mem_collect_intv(const mem_opt_t *opt, const FMTIndex *fmt, int len, } } #ifdef SHOW_PERF - a->time_seed_3 += realtime_msec() - tmp_time; +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + a->time_seed_3 += tmp_time2 - tmp_time1; #endif #ifdef FILTER_FULL_MATCH @@ -350,7 +372,12 @@ mem_chain_v mem_chain(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t for (k = count = 0; k < p->x[2] && count < opt->max_occ; k += step, ++count) { mem_seed_t s; #ifdef SHOW_PERF - int64_t tmp_time = realtime_msec(); + uint64_t tmp_time1, tmp_time2; +#if USE_RDTSC + tmp_time1 = __rdtsc(); +#else + tmp_time1 = realtime_msec(); +#endif #endif s.rbeg = fmt_sa(fmt, p->x[0] + k); //kv_push(uint64_t, v1, s.rbeg); @@ -369,9 +396,14 @@ mem_chain_v mem_chain(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t // s.rbeg = (fmt->l_pac << 1) - slen - s.rbeg; //} //s.rbeg = fmt_sa(fmt, p->x[0] + k); + #ifdef SHOW_PERF - tmp_time = realtime_msec() - tmp_time; - __sync_fetch_and_add(&time_bwt_sa, tmp_time); +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + aux->time_sa += tmp_time2 - tmp_time1; #endif CHECK_ADD_CHAIN(tmp, lower, upper); } @@ -836,15 +868,27 @@ void mem_chain2aln(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac printf("*** Left query: "); for (j = 0; j < s->qbeg; ++j) putchar("ACGTN"[(int)query[s->qbeg - 1 - j]]); putchar('\n'); } #ifdef SHOW_PERF - int64_t tmp_time = realtime_msec(); + uint64_t tmp_time1, tmp_time2; +#if USE_RDTSC + tmp_time1 = __rdtsc(); +#else + tmp_time1 = realtime_msec(); #endif +#endif + #ifndef USE_AVX2 a->score = ksw_extend2(s->qbeg, qs, tmp, rs, 5, opt->mat, opt->o_del, opt->e_del, opt->o_ins, opt->e_ins, aw[0], opt->pen_clip5, opt->zdrop, s->len * opt->a, &qle, &tle, >le, &gscore, &max_off[0], aux->sw_buf); #else a->score = ksw_extend2_avx2(s->qbeg, query, tmp, rseq, 1, 5, opt->mat, opt->o_del, opt->e_del, opt->o_ins, opt->e_ins, opt->a, opt->b, aw[0], opt->pen_clip5, opt->zdrop, s->len * opt->a, &qle, &tle, >le, &gscore, &max_off[0], aux->sw_buf); #endif + #ifdef SHOW_PERF - aux->time_bsw += realtime_msec() - tmp_time; +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + aux->time_bsw += tmp_time2 - tmp_time1; #endif if (bwa_verbose >= 4) { printf("*** Left extension: prev_score=%d; score=%d; bandwidth=%d; max_off_diagonal_dist=%d\n", prev, a->score, aw[0], max_off[0]); fflush(stdout); } if (a->score == prev || max_off[0] < (aw[0]>>1) + (aw[0]>>2)) break; @@ -876,7 +920,12 @@ void mem_chain2aln(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac printf("*** Right query: "); for (j = 0; j < l_query - qe; ++j) putchar("ACGTN"[(int)query[qe+j]]); putchar('\n'); } #ifdef SHOW_PERF - int64_t tmp_time = realtime_msec(); + uint64_t tmp_time1, tmp_time2; +#if USE_RDTSC + tmp_time1 = __rdtsc(); +#else + tmp_time1 = realtime_msec(); +#endif #endif #ifndef USE_AVX2 a->score = ksw_extend2(l_query - qe, query + qe, rmax[1] - rmax[0] - re, rseq + re, 5, opt->mat, opt->o_del, opt->e_del, opt->o_ins, opt->e_ins, aw[1], opt->pen_clip3, opt->zdrop, sc0, &qle, &tle, >le, &gscore, &max_off[1], aux->sw_buf); @@ -884,7 +933,12 @@ void mem_chain2aln(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac a->score = ksw_extend2_avx2(l_query - qe, query + qe, rmax[1] - rmax[0] - re, rseq + re, 0, 5, opt->mat, opt->o_del, opt->e_del, opt->o_ins, opt->e_ins, opt->a, opt->b, aw[1], opt->pen_clip3, opt->zdrop, sc0, &qle, &tle, >le, &gscore, &max_off[1], aux->sw_buf); #endif #ifdef SHOW_PERF - aux->time_bsw += realtime_msec() - tmp_time; +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + aux->time_bsw += tmp_time2 - tmp_time1; #endif if (bwa_verbose >= 4) { printf("*** Right extension: prev_score=%d; score=%d; bandwidth=%d; max_off_diagonal_dist=%d\n", prev, a->score, aw[1], max_off[1]); fflush(stdout); } if (a->score == prev || max_off[1] < (aw[1]>>1) + (aw[1]>>2)) break; @@ -1134,7 +1188,7 @@ void mem_reorder_primary5(int T, mem_alnreg_v *a) } // TODO (future plan): group hits into a uint64_t[] array. This will be cleaner and more flexible -void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a, int extra_flag, const mem_aln_t *m) +void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a, int extra_flag, const mem_aln_t *m, seq_sam_t *ss) { extern char **mem_gen_alt(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, mem_alnreg_v *a, int l_query, const char *query); // kstring_t str; @@ -1146,7 +1200,7 @@ void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, XA = mem_gen_alt(opt, bns, pac, a, s->l_seq, s->seq); kv_init(aa); // str.l = str.m = 0; str.s = 0; - s->sam.l = 0; + ss->sam.l = 0; for (k = l = 0; k < a->n; ++k) { mem_alnreg_t *p = &a->a[k]; mem_aln_t *q; @@ -1169,10 +1223,10 @@ void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, mem_aln_t t; t = mem_reg2aln(opt, bns, pac, s->l_seq, s->seq, 0); t.flag |= extra_flag; - mem_aln2sam(opt, bns, &s->sam, s, 1, &t, 0, m); + mem_aln2sam(opt, bns, &ss->sam, s, 1, &t, 0, m); } else { for (k = 0; k < aa.n; ++k) - mem_aln2sam(opt, bns, &s->sam, s, aa.n, aa.a, k, m); + mem_aln2sam(opt, bns, &ss->sam, s, aa.n, aa.a, k, m); for (k = 0; k < aa.n; ++k) free(aa.a[k].cigar); free(aa.a); } @@ -1309,18 +1363,18 @@ static void worker1(void *data, int i, int tid) static void worker2(void *data, int i, int tid) { - extern int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2]); + extern int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2], seq_sam_t ss[2]); extern void mem_reg2ovlp(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a); mem_worker_t *w = (mem_worker_t*)data; if (!(w->opt->flag&MEM_F_PE)) { if (bwa_verbose >= 4) printf("=====> Finalizing read '%s' <=====\n", w->seqs[i].name); mem_mark_primary_se(w->opt, w->regs[i].n, w->regs[i].a, w->n_processed + i); if (w->opt->flag & MEM_F_PRIMARY5) mem_reorder_primary5(w->opt->T, &w->regs[i]); - mem_reg2sam(w->opt, w->bns, w->pac, &w->seqs[i], &w->regs[i], 0, 0); + mem_reg2sam(w->opt, w->bns, w->pac, &w->seqs[i], &w->regs[i], 0, 0, &w->sams[i]); free(w->regs[i].a); } else { if (bwa_verbose >= 4) printf("=====> Finalizing read pair '%s' <=====\n", w->seqs[i<<1|0].name); - mem_sam_pe(w->opt, w->bns, w->pac, w->pes, (w->n_processed>>1) + i, &w->seqs[i<<1], &w->regs[i<<1]); + mem_sam_pe(w->opt, w->bns, w->pac, w->pes, (w->n_processed>>1) + i, &w->seqs[i<<1], &w->regs[i<<1], &w->sams[i<<1]); free(w->regs[i<<1|0].a); free(w->regs[i<<1|1].a); } } @@ -1381,7 +1435,12 @@ static void mem_collect_intv_batch(const mem_opt_t *opt, const FMTIndex *fmt, in smem->mem.n = 0; #ifdef SHOW_PERF - int64_t tmp_time = realtime_msec(); + uint64_t tmp_time1, tmp_time2; +#if USE_RDTSC + tmp_time1 = __rdtsc(); +#else + tmp_time1 = realtime_msec(); +#endif #endif // 1. first pass: find all SMEMs //fprintf(gfp1, "read\n"); @@ -1405,12 +1464,21 @@ static void mem_collect_intv_batch(const mem_opt_t *opt, const FMTIndex *fmt, in if (start_flag) ++start_N_num; } } -#ifdef SHOW_PERF - a->time_seed_1 += realtime_msec() - tmp_time; - tmp_time = realtime_msec(); - s2n += 1; + +#ifdef SHOW_DATA_PERF + gdat[0] += 1; // read num ++ if (max_seed_len == len - start_N_num) - s1n += 1; + gdat[1] += 1; // seed-1 full match num ++ +#endif + +#ifdef SHOW_PERF +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + a->time_seed_1 += tmp_time2 - tmp_time1; + tmp_time1 = tmp_time2; #endif #ifdef FILTER_FULL_MATCH @@ -1435,10 +1503,17 @@ static void mem_collect_intv_batch(const mem_opt_t *opt, const FMTIndex *fmt, in } // fprintf(gfp1, "end\n"); } + #ifdef SHOW_PERF - a->time_seed_2 += realtime_msec() - tmp_time; - tmp_time = realtime_msec(); +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); #endif + a->time_seed_2 += tmp_time2 - tmp_time1; + tmp_time1 = tmp_time2; +#endif + // 3. third pass: LAST-like if (opt->max_mem_intv > 0) { x = 0; @@ -1453,8 +1528,14 @@ static void mem_collect_intv_batch(const mem_opt_t *opt, const FMTIndex *fmt, in } else ++x; } } + #ifdef SHOW_PERF - a->time_seed_3 += realtime_msec() - tmp_time; +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + a->time_seed_3 += tmp_time2 - tmp_time1; #endif #ifdef FILTER_FULL_MATCH @@ -1490,19 +1571,31 @@ void find_smem(const mem_opt_t *opt, const FMTIndex *fmt, int len, const uint8_t for (k = count = 0; k < p->x[2] && count < opt->max_occ; k += step, ++count) { #ifdef SHOW_PERF - int64_t tmp_time = realtime_msec(); + uint64_t tmp_time1, tmp_time2; +#if USE_RDTSC + tmp_time1 = __rdtsc(); +#else + tmp_time1 = realtime_msec(); #endif +#endif + uint64_t pos = fmt_sa_offset(fmt, p->x[0] + k); // this is the base coordinate in the forward-reverse reference + #ifdef SHOW_PERF - aux->time_sa += realtime_msec() - tmp_time; +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); #endif + aux->time_sa += tmp_time2 - tmp_time1; +#endif + kv_push(uint64_t, smemv->pos_arr, pos); } } } } - void generate_chain(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t *bns, int len, const uint8_t *seq, smem_v *smemv, mem_chain_v *chain) { int i, j, b, e, l_rep; @@ -1615,6 +1708,15 @@ void mem_core_process(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t mem_alnreg_v *regp; char *seq; +#ifdef SHOW_PERF + uint64_t tmp_time1, tmp_time2; +#if USE_RDTSC + tmp_time1 = __rdtsc(); +#else + tmp_time1 = realtime_msec(); +#endif +#endif + // 1. seeding for (i = 0; i < nseq; ++i) { seq = seq_arr[i].seq; @@ -1625,6 +1727,16 @@ void mem_core_process(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t find_smem(opt, fmt, l_seq, (uint8_t *)seq, aux, &smem_arr[i]); } +#ifdef SHOW_PERF +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + aux->time_seed_all += tmp_time2 - tmp_time1; + tmp_time1 = tmp_time2; +#endif + // 2. chain for (i=0; i= 4) mem_print_chain(bns, chnp); } +#ifdef SHOW_PERF +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + aux->time_chain += tmp_time2 - tmp_time1; + tmp_time1 = tmp_time2; +#endif + // 3. align for (i=0; in; ++j) { mem_chain_t *p = &chnp->a[j]; if (bwa_verbose >= 4) err_printf("* ---> Processing chain(%d) <---\n", j); mem_chain2aln(opt, bns, pac, l_seq, (uint8_t*)seq, p, regp, aux); free(chnp->a[j].seeds); } -//#ifdef SHOW_PERF -// tmp_time = realtime_msec() - tmp_time; -// __sync_fetch_and_add(&time_ksw_extend2, tmp_time); -//#endif + free(chnp->a); chnp->m = 0; chnp->a = 0; regp->n = mem_sort_dedup_patch(opt, bns, pac, (uint8_t *)seq, regp->n, regp->a); if (bwa_verbose >= 4) { @@ -1670,13 +1787,17 @@ void mem_core_process(const mem_opt_t *opt, const FMTIndex *fmt, const bntseq_t if (p->rid >= 0 && bns->anns[p->rid].is_alt) p->is_alt = 1; } -#ifdef SHOW_PERF - tmp_time = realtime_msec() - tmp_time; - __sync_fetch_and_add(&time_ksw_extend2, tmp_time); -#endif - } +#ifdef SHOW_PERF +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + aux->time_bsw_all += tmp_time2 - tmp_time1; +#endif + #if 1 // 4. calc insert size #define MIN_RATIO 0.8 @@ -1708,7 +1829,7 @@ static void worker_smem_align(void *data, int i, int tid) static void worker_sam(void *data, int i, int tid) { - extern int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2]); + extern int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2], seq_sam_t ss[2]); extern void mem_reg2ovlp(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a); extern void mem_matesw_batch(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], bseq1_t *s, mem_alnreg_v *a, int data_size); mem_worker_t *w = (mem_worker_t*)data; @@ -1720,7 +1841,7 @@ static void worker_sam(void *data, int i, int tid) if (bwa_verbose >= 4) printf("=====> Finalizing read '%s' <=====\n", w->seqs[i].name); mem_mark_primary_se(w->opt, w->regs[i].n, w->regs[i].a, w->n_processed + i); if (w->opt->flag & MEM_F_PRIMARY5) mem_reorder_primary5(w->opt->T, &w->regs[i]); - mem_reg2sam(w->opt, w->bns, w->pac, &w->seqs[i], &w->regs[i], 0, 0); + mem_reg2sam(w->opt, w->bns, w->pac, &w->seqs[i], &w->regs[i], 0, 0, &w->sams[i]); free(w->regs[i].a); } } else { @@ -1730,13 +1851,13 @@ static void worker_sam(void *data, int i, int tid) for (i=start; i= 4) printf("=====> Finalizing read pair '%s' <=====\n", w->seqs[i].name); - mem_sam_pe(w->opt, w->bns, w->pac, w->pes, (w->n_processed+i)>>1, &w->seqs[i], &w->regs[i]); + mem_sam_pe(w->opt, w->bns, w->pac, w->pes, (w->n_processed+i)>>1, &w->seqs[i], &w->regs[i], &w->sams[i]); free(w->regs[i].a); free(w->regs[i+1].a); } } } -void mem_process_seqs(const mem_opt_t *opt, mem_worker_t *w, int64_t n_processed, int n, bseq1_t *seqs, const mem_pestat_t *pes0) +void mem_process_seqs(const mem_opt_t *opt, mem_worker_t *w, int64_t n_processed, int n, bseq1_t *seqs, const mem_pestat_t *pes0, seq_sam_t *sams) { extern void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n); mem_pestat_t pes[4]; @@ -1751,8 +1872,9 @@ void mem_process_seqs(const mem_opt_t *opt, mem_worker_t *w, int64_t n_processed w->n = n; w->regs = realloc(w->regs, n * sizeof(mem_alnreg_v)); } - w->seqs = seqs; w->n_processed = n_processed; + w->seqs = seqs; w->n_processed = n_processed; w->sams = sams; w->n_reads = n; w->pes = &pes[0]; + #if 1 if ((opt->flag & MEM_F_PE) && !pes0) { // infer insert sizes if not provided int i, j; @@ -1762,25 +1884,47 @@ void mem_process_seqs(const mem_opt_t *opt, mem_worker_t *w, int64_t n_processed w->isize_arr[i][j].n = 0; } #endif + #ifdef SHOW_PERF - int64_t tmp_time = realtime_msec(); + uint64_t tmp_time1, tmp_time2; +#if USE_RDTSC + tmp_time1 = __rdtsc(); +#else + tmp_time1 = realtime_msec(); #endif +#endif + //kt_for(opt->n_threads, worker1, w, (opt->flag&MEM_F_PE)? n>>1 : n); // find mapping positions kt_for(opt->n_threads, worker_smem_align, w, n_batch); // find mapping positions -#ifdef SHOW_PERF - time_work1 += realtime_msec() - tmp_time; - tmp_time = realtime_msec(); -#endif + if (opt->flag&MEM_F_PE) { // infer insert sizes if not provided if (pes0) memcpy(pes, pes0, 4 * sizeof(mem_pestat_t)); // if pes0 != NULL, set the insert-size distribution as pes0 else mem_pestat(opt, w->bns->l_pac, n, w->isize_arr, pes); // otherwise, infer the insert size distribution from data //else mem_pestat_old(opt, w->bns->l_pac, n, w->regs, pes); } + +#ifdef SHOW_PERF +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + time_work_kernel += tmp_time2 - tmp_time1; + tmp_time1 = tmp_time2; +#endif + kt_for(opt->n_threads, worker2, w, (opt->flag&MEM_F_PE)? n>>1 : n); // generate alignment //kt_for(opt->n_threads, worker_sam, w, n_batch); + #ifdef SHOW_PERF - time_work2 += realtime_msec() - tmp_time; +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); #endif + time_work_sam += tmp_time2 - tmp_time1; +#endif + //free(w.regs); if (bwa_verbose >= 3) fprintf(stderr, "[M::%s] Processed %d reads in %.3f CPU sec, %.3f real sec\n", __func__, n, cputime() - ctime, realtime() - rtime); diff --git a/bwamem.h b/bwamem.h index cd5c93b..1149658 100644 --- a/bwamem.h +++ b/bwamem.h @@ -145,11 +145,14 @@ typedef struct { size_t n, m; mem_chain_t *a; } mem_chain_v; typedef struct { bwtintv_v mem, mem1, *tmpv[2]; buf_t *sw_buf, *seq_buf; - int64_t time_seed_1; - int64_t time_seed_2; - int64_t time_seed_3; - int64_t time_sa; - int64_t time_bsw; + uint64_t time_seed_1; + uint64_t time_seed_2; + uint64_t time_seed_3; + uint64_t time_seed_all; + uint64_t time_chain; + uint64_t time_sa; + uint64_t time_bsw; + uint64_t time_bsw_all; } smem_aux_t; typedef struct { @@ -167,6 +170,7 @@ typedef struct { const mem_pestat_t *pes; smem_aux_t **aux; bseq1_t *seqs; + seq_sam_t *sams; smem_v **smem_arr; mem_chain_v **chain_arr; mem_alnreg_v *regs; @@ -212,7 +216,7 @@ extern "C" { * @param pes0 insert-size info; if NULL, infer from data; if not NULL, it should be an array with 4 elements, * corresponding to each FF, FR, RF and RR orientation. See mem_pestat() for more info. */ - void mem_process_seqs(const mem_opt_t *opt, mem_worker_t *w, int64_t n_processed, int n, bseq1_t *seqs, const mem_pestat_t *pes0); + void mem_process_seqs(const mem_opt_t *opt, mem_worker_t *w, int64_t n_processed, int n, bseq1_t *seqs, const mem_pestat_t *pes0, seq_sam_t *sams); /** * Find the aligned regions for one query sequence diff --git a/bwamem_pair.c b/bwamem_pair.c index 5855779..eedd521 100644 --- a/bwamem_pair.c +++ b/bwamem_pair.c @@ -547,11 +547,11 @@ void mem_reorder_primary5(int T, mem_alnreg_v *a); #define raw_mapq(diff, a) ((int)(6.02 * (diff) / (a) + .499)) -int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2]) +int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2], seq_sam_t ss[2]) { extern int mem_mark_primary_se(const mem_opt_t *opt, int n, mem_alnreg_t *a, int64_t id); extern int mem_approx_mapq_se(const mem_opt_t *opt, const mem_alnreg_t *a); - extern void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a, int extra_flag, const mem_aln_t *m); + extern void mem_reg2sam(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, bseq1_t *s, mem_alnreg_v *a, int extra_flag, const mem_aln_t *m, seq_sam_t *ss); extern char **mem_gen_alt(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_alnreg_v *a, int l_query, const char *query); int n = 0, i, j, z[2], o, subo, n_sub, extra_flag = 1, n_pri[2], n_aa[2]; @@ -651,12 +651,12 @@ int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, co aa[i][n_aa[i]++] = g[i]; } } - s[0].sam.l = 0; + ss[0].sam.l = 0; for (i = 0; i < n_aa[0]; ++i) - mem_aln2sam(opt, bns, &s[0].sam, &s[0], n_aa[0], aa[0], i, &h[1]); // write read1 hits - s[1].sam.l = 0; + mem_aln2sam(opt, bns, &ss[0].sam, &s[0], n_aa[0], aa[0], i, &h[1]); // write read1 hits + ss[1].sam.l = 0; for (i = 0; i < n_aa[1]; ++i) - mem_aln2sam(opt, bns, &s[1].sam, &s[1], n_aa[1], aa[1], i, &h[0]); // write read2 hits + mem_aln2sam(opt, bns, &ss[1].sam, &s[1], n_aa[1], aa[1], i, &h[0]); // write read2 hits if (strcmp(s[0].name, s[1].name) != 0) err_fatal(__func__, "paired reads have different names: \"%s\", \"%s\"\n", s[0].name, s[1].name); // free for (i = 0; i < 2; ++i) { @@ -685,8 +685,8 @@ no_pairing: d = mem_infer_dir(bns->l_pac, a[0].a[0].rb, a[1].a[0].rb, &dist); if (!pes[d].failed && dist >= pes[d].low && dist <= pes[d].high) extra_flag |= 2; } - mem_reg2sam(opt, bns, pac, &s[0], &a[0], 0x41|extra_flag, &h[1]); - mem_reg2sam(opt, bns, pac, &s[1], &a[1], 0x81|extra_flag, &h[0]); + mem_reg2sam(opt, bns, pac, &s[0], &a[0], 0x41|extra_flag, &h[1], &ss[0]); + mem_reg2sam(opt, bns, pac, &s[1], &a[1], 0x81|extra_flag, &h[0], &ss[1]); if (strcmp(s[0].name, s[1].name) != 0) err_fatal(__func__, "paired reads have different names: \"%s\", \"%s\"\n", s[0].name, s[1].name); free(h[0].cigar); free(h[1].cigar); return n; diff --git a/debug.sh b/debug.sh index 709d0e1..0534657 100755 --- a/debug.sh +++ b/debug.sh @@ -1,6 +1,8 @@ thread=1 -n_r1=~/data/fastq/dataset/na12878_wes_144/diff_r1.fq -n_r2=~/data/fastq/dataset/na12878_wes_144/diff_r2.fq +n_r1=~/data/fastq/dataset/na12878_wgs_150/diff_r1.fq +n_r2=~/data/fastq/dataset/na12878_wgs_150/diff_r2.fq +#n_r1=~/data/fastq/dataset/na12878_wes_144/diff_r1.fq +#n_r2=~/data/fastq/dataset/na12878_wes_144/diff_r2.fq #n_r1=~/data/fastq/ZY2105177532213000/n_r1.fq #n_r2=~/data/fastq/ZY2105177532213000/n_r2.fq #n_r1=~/data/fastq/ZY2105177532213000/sn_r1.fq diff --git a/fastmap.c b/fastmap.c index 625b89c..6609a1f 100644 --- a/fastmap.c +++ b/fastmap.c @@ -32,6 +32,8 @@ #include #include #include +#include +#include #include "bwa.h" #include "bwamem.h" #include "kvec.h" @@ -39,11 +41,12 @@ #include "bntseq.h" #include "kseq.h" #include "fmt_idx.h" +#include "yarn.h" KSEQ_DECLARE(gzFile) // 记录运行时间的变量 +/* #ifdef SHOW_PERF - int64_t time_ksw_extend2 = 0, time_ksw_global2 = 0, time_ksw_align2 = 0, @@ -65,8 +68,31 @@ int64_t dn = 0, n16 = 0, n17 = 0, n18 = 0, n19 = 0, nall = 0, num_sa = 0; int64_t s1n = 0, s2n = 0, s3n = 0, s1l = 0, s2l = 0, s3l = 0; int64_t gn[100] = {0}; int64_t g_num_smem2 = 0; -FILE *gfp1, *gfp2, *gfp3; +#endif +*/ +#ifdef SHOW_PERF +uint64_t time_process_data = 0, time_read = 0, time_write = 0, time_compute = 0, + time_seed_1 = 0, time_seed_2 = 0, time_seed_3 = 0, + time_seed_sa = 0, time_seed_chain = 0, time_seed_all = 0, + time_bsw = 0, time_bsw_all = 0, + time_work_kernel = 0, time_work_sam = 0, + time_load_idx = 0; + +uint64_t proc_freq = 1000; + +#endif + +#ifdef SHOW_DATA_PERF +/* +gdat[0]: read nums +gdat[1]: seed-1 full match nums +*/ +int64_t gdat[100]; +#endif + +#ifdef DEBUG_OUTPUT + FILE * gfp1, *gfp2, *gfp3; #endif extern unsigned char nst_nt4_table[256]; @@ -77,8 +103,11 @@ void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_d typedef struct { int n_seqs; + int n_sams; int m_seqs; + int m_sams; bseq1_t *seqs; + seq_sam_t *sams; } ktp_data_t; typedef struct { @@ -91,60 +120,277 @@ typedef struct { mem_worker_t *w; int data_idx; // pingpong buffer index ktp_data_t *data; + volatile int read_complete; + volatile int calc_complete; + long read_idx; + long calc_idx; + long write_idx; } ktp_aux_t; +// read +static inline void *read_data(ktp_aux_t *aux, ktp_data_t *data) +{ + +#ifdef SHOW_PERF + int64_t tmp_time1, tmp_time2; +#if USE_RDTSC + tmp_time1 = __rdtsc(); +#else + tmp_time1 = realtime_msec(); +#endif +#endif + + ktp_data_t *ret = aux->data + aux->data_idx; + aux->data_idx = !aux->data_idx; + int64_t size = 0; + bseq_read(aux->actual_chunk_size, &ret->n_seqs, aux->ks, aux->ks2, aux->copy_comment, &size, &ret->m_seqs, &ret->seqs); + +#ifdef SHOW_PERF +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + time_read += tmp_time2 - tmp_time1; +#endif + +#if 0 + // 计算内存占用 + int i; + int64_t ms = 0; + for (i = 0; i < ret->n_seqs; ++i) + { + ms += ret->seqs[i].m_name + ret->seqs[i].m_seq + ret->seqs[i].m_comment + ret->seqs[i].m_qual; + } + fprintf(stderr, "seq size: %ld M\n", ms / 1024 / 1024); +#endif + + if (ret->n_seqs == 0) { + return 0; + } + if (bwa_verbose >= 3) + fprintf(stderr, "[M::%s] read %d sequences (%ld bp)...\n", __func__, ret->n_seqs, (long)size); + return ret; +} + +// calculate +static inline void *calc_data(ktp_aux_t *aux, ktp_data_t *data) +{ +#ifdef SHOW_PERF + int64_t tmp_time1, tmp_time2; +#if USE_RDTSC + tmp_time1 = __rdtsc(); +#else + tmp_time1 = realtime_msec(); +#endif +#endif + + const mem_opt_t *opt = aux->opt; + if (data->n_sams != data->n_seqs) { + if (data->m_sams < data->m_seqs) { + data->m_sams = data->m_seqs; + data->sams = realloc(data->sams, data->m_sams * sizeof(seq_sam_t)); + memset(data->sams + data->n_sams, 0, (data->m_sams - data->n_sams) * sizeof(seq_sam_t)); + } + data->n_sams = data->n_seqs; + } + if (opt->flag & MEM_F_SMARTPE) { + int i; + fprintf(stderr, "here MEM_F_SMARTPE\n"); + bseq1_t *sep[2]; + seq_sam_t *ss[2]; + int n_sep[2]; + mem_opt_t tmp_opt = *opt; + bseq_classify(data->n_seqs, data->seqs, n_sep, sep); + ss[0] = calloc(0, n_sep[0] * sizeof(seq_sam_t)); + ss[1] = calloc(0, n_sep[1] * sizeof(seq_sam_t)); + if (bwa_verbose >= 3) + fprintf(stderr, "[M::%s] %d single-end sequences; %d paired-end sequences\n", __func__, n_sep[0], n_sep[1]); + if (n_sep[0]) { + tmp_opt.flag &= ~MEM_F_PE; + mem_process_seqs(&tmp_opt, aux->w, aux->n_processed, n_sep[0], sep[0], 0, ss[0]); + for (i = 0; i < n_sep[0]; ++i) + data->sams[sep[0][i].id].sam = ss[0][i].sam; + } + if (n_sep[1]) { + tmp_opt.flag |= MEM_F_PE; + mem_process_seqs(&tmp_opt, aux->w, aux->n_processed + n_sep[0], n_sep[1], sep[1], aux->pes0, ss[1]); + for (i = 0; i < n_sep[1]; ++i) + data->sams[sep[1][i].id].sam = ss[1][i].sam; + } + free(sep[0]); free(sep[1]); + free(ss[0]); free(ss[1]); + } else mem_process_seqs(opt, aux->w, aux->n_processed, data->n_seqs, data->seqs, aux->pes0, data->sams); + aux->n_processed += data->n_seqs; + +#ifdef SHOW_PERF +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + time_compute += tmp_time2 - tmp_time1; +#endif + return data; +} + +// write +static inline void *write_data(ktp_aux_t *aux, ktp_data_t *data) +{ + int i; +#ifdef SHOW_PERF + int64_t tmp_time1, tmp_time2; +#if USE_RDTSC + tmp_time1 = __rdtsc(); +#else + tmp_time1 = realtime_msec(); +#endif +#endif + //int64_t ms = 0; + for (i = 0; i < data->n_sams; ++i) + { + //ms += data->sams[i].sam.m; + if (data->sams[i].sam.l) + err_fputs(data->sams[i].sam.s, stdout); + } + //fprintf(stderr, "sam size: %ld M\n", ms / 1024 / 1024); + +#ifdef SHOW_PERF +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + time_write += tmp_time2 - tmp_time1; +#endif + return 0; +} + +// io 异步,读和写不能同时 static void *process(void *shared, int step, void *_data) { ktp_aux_t *aux = (ktp_aux_t*)shared; ktp_data_t *data = (ktp_data_t*)_data; - int i; if (step == 0) { - ktp_data_t *ret = aux->data + aux->data_idx; - aux->data_idx = !aux->data_idx; - int64_t size = 0; - bseq_read(aux->actual_chunk_size, &ret->n_seqs, aux->ks, aux->ks2, aux->copy_comment, &size, &ret->m_seqs, &ret->seqs); - if (ret->n_seqs == 0) { - return 0; - } - if (bwa_verbose >= 3) - fprintf(stderr, "[M::%s] read %d sequences (%ld bp)...\n", __func__, ret->n_seqs, (long)size); - return ret; + return read_data(aux, data); } else if (step == 1) { - const mem_opt_t *opt = aux->opt; - // const bwaidx_t *idx = aux->idx; - if (opt->flag & MEM_F_SMARTPE) { - bseq1_t *sep[2]; - int n_sep[2]; - mem_opt_t tmp_opt = *opt; - bseq_classify(data->n_seqs, data->seqs, n_sep, sep); - if (bwa_verbose >= 3) - fprintf(stderr, "[M::%s] %d single-end sequences; %d paired-end sequences\n", __func__, n_sep[0], n_sep[1]); - if (n_sep[0]) { - tmp_opt.flag &= ~MEM_F_PE; - mem_process_seqs(&tmp_opt, aux->w, aux->n_processed, n_sep[0], sep[0], 0); - for (i = 0; i < n_sep[0]; ++i) - data->seqs[sep[0][i].id].sam = sep[0][i].sam; - } - if (n_sep[1]) { - tmp_opt.flag |= MEM_F_PE; - mem_process_seqs(&tmp_opt, aux->w, aux->n_processed + n_sep[0], n_sep[1], sep[1], aux->pes0); - for (i = 0; i < n_sep[1]; ++i) - data->seqs[sep[1][i].id].sam = sep[1][i].sam; - } - free(sep[0]); free(sep[1]); - } else - mem_process_seqs(opt, aux->w, aux->n_processed, data->n_seqs, data->seqs, aux->pes0); - aux->n_processed += data->n_seqs; - return data; + return calc_data(aux, data); } else if (step == 2) { - for (i = 0; i < data->n_seqs; ++i) { - if (data->seqs[i].sam.l) err_fputs(data->seqs[i].sam.s, stdout); - } - return 0; + return write_data(aux, data); } return 0; } +////////////// 读和写可以同时进行的pipeline +static lock_t *input_have = NULL; +static lock_t *output_have = NULL; + +static void *thread_read(void *data) +{ + ktp_aux_t *aux = (ktp_aux_t *)data; + while (1) + { + POSSESS(input_have); + WAIT_FOR(input_have, NOT_TO_BE, 0); + RELEASE(input_have); + // fprintf(stderr, "start read: %ld\n", aux->read_idx); + if (read_data(aux, aux->data) == 0) { + POSSESS(input_have); + aux->read_complete = 1; + TWIST(input_have, BY, -1); + break; + } + POSSESS(input_have); + aux->read_idx++; + TWIST(input_have, BY, -1); + // fprintf(stderr, "next read: %ld\n", aux->read_idx); + } + return 0; +} + +static void *thread_calc(void *data) +{ + ktp_aux_t *aux = (ktp_aux_t *)data; + int d_idx = 0; + int add_idx = 0; + while (1) + { + // fprintf(stderr, "start calc: %ld\n", aux->calc_idx); + POSSESS(input_have); + WAIT_FOR(input_have, NOT_TO_BE, 2); + RELEASE(input_have); // 应该没必要持有吧? + + POSSESS(output_have); + WAIT_FOR(output_have, NOT_TO_BE, 2); + RELEASE(output_have); + + if (aux->calc_idx < aux->read_idx) { + calc_data(aux, aux->data + d_idx); + d_idx = !d_idx; + add_idx = 1; + } + if (aux->read_complete) + { + POSSESS(output_have); + if (add_idx) aux->calc_idx ++; + aux->calc_complete = 1; + TWIST(output_have, BY, 1); // 最后要唤醒写线程 + break; // 计算完了 + } + POSSESS(output_have); + if (add_idx) aux->calc_idx ++; + TWIST(output_have, BY, 1); + + POSSESS(input_have); + TWIST(input_have, BY, 1); + // fprintf(stderr, "next calc: %ld\n", aux->calc_idx); + } + return 0; +} + +static void *thread_write(void *data) +{ + ktp_aux_t *aux = (ktp_aux_t *)data; + int d_idx = 0; + while (1) + { + // fprintf(stderr, "start write: %ld\n", aux->write_idx); + POSSESS(output_have); + WAIT_FOR(output_have, NOT_TO_BE, 0); + RELEASE(output_have); + if (aux->write_idx < aux->calc_idx) { + write_data(aux, aux->data + d_idx); + d_idx = !d_idx; + aux->write_idx++; + } + if (aux->calc_complete) { + if (aux->write_idx < aux->calc_idx) + write_data(aux, aux->data + d_idx); + break; + } + POSSESS(output_have); + TWIST(output_have, BY, -1); + // fprintf(stderr, "next write: %ld\n", aux->write_idx); + } + return 0; +} + +static void new_pipeline(ktp_aux_t *aux) +{ + input_have = NEW_LOCK(2); + output_have = NEW_LOCK(0); + pthread_t tid[3]; + int i; + + pthread_create(&tid[0], 0, thread_read, aux); + pthread_create(&tid[1], 0, thread_calc, aux); + pthread_create(&tid[2], 0, thread_write, aux); + + for (i = 0; i < 3; ++i) pthread_join(tid[i], 0); +} + +////////////// + static void update_a(mem_opt_t *opt, const mem_opt_t *opt0) { if (opt0->a) { // matching score is changed @@ -163,11 +409,23 @@ static void update_a(mem_opt_t *opt, const mem_opt_t *opt0) int main_mem(int argc, char *argv[]) { -#ifdef SHOW_PERF + +#ifdef DEBUG_OUTPUT gfp1 = fopen("./fp1.txt", "w"); gfp2 = fopen("./fp2.txt", "w"); gfp3 = fopen("./fp3.txt", "w"); #endif + +#ifdef SHOW_PERF +#if USE_RDTSC + uint64_t tmp_time = __rdtsc(); + sleep(1); + proc_freq = __rdtsc() - tmp_time; +#else + proc_freq = 1000; +#endif +#endif + mem_opt_t *opt, opt0; int fd, fd2, i, c, ignore_alt = 0, no_mt_io = 0; int fixed_chunk_size = -1; @@ -184,9 +442,10 @@ int main_mem(int argc, char *argv[]) aux.opt = opt = mem_opt_init(); memset(&opt0, 0, sizeof(mem_opt_t)); - while ((c = getopt(argc, argv, "51qpaMCSPVYjuk:c:v:s:r:t:b:R:A:B:O:E:U:w:L:d:T:Q:D:m:I:N:o:f:W:x:G:h:y:K:X:H:F:z:")) >= 0) { + while ((c = getopt(argc, argv, "512qpaMCSPVYjuk:c:v:s:r:t:b:R:A:B:O:E:U:w:L:d:T:Q:D:m:I:N:o:f:W:x:G:h:y:K:X:H:F:z:")) >= 0) { if (c == 'k') opt->min_seed_len = atoi(optarg), opt0.min_seed_len = 1; else if (c == '1') no_mt_io = 1; + else if (c == '2') no_mt_io = 2; else if (c == 'x') mode = optarg; else if (c == 'w') opt->w = atoi(optarg), opt0.w = 1; else if (c == 'A') opt->a = atoi(optarg), opt0.a = 1; @@ -313,6 +572,7 @@ int main_mem(int argc, char *argv[]) fprintf(stderr, " -W INT discard a chain if seeded bases shorter than INT [0]\n"); fprintf(stderr, " -m INT perform at most INT rounds of mate rescues for each read [%d]\n", opt->max_matesw); fprintf(stderr, " -S skip mate rescue\n"); + fprintf(stderr, " -2 read and write in different disks \n"); fprintf(stderr, " -P skip pairing; mate rescue performed unless -S also in use\n"); fprintf(stderr, "\nScoring options:\n\n"); fprintf(stderr, " -A INT score for a sequence match, which scales options -TdBOELU unless overridden [%d]\n", opt->a); @@ -391,6 +651,16 @@ int main_mem(int argc, char *argv[]) } } else update_a(opt, &opt0); bwa_fill_scmat(opt->a, opt->b, opt->mat); + +#ifdef SHOW_PERF + int64_t tmp_time1, tmp_time2; +#if USE_RDTSC + tmp_time1 = __rdtsc(); +#else + tmp_time1 = realtime_msec(); +#endif +#endif + aux.idx = bwa_idx_load_from_shm(argv[optind]); if (aux.idx == 0) { if ((aux.idx = bwa_idx_load(argv[optind], BWA_IDX_ALL)) == 0) return 1; // FIXME: memory leak @@ -400,6 +670,15 @@ int main_mem(int argc, char *argv[]) for (i = 0; i < aux.idx->bns->n_seqs; ++i) aux.idx->bns->anns[i].is_alt = 0; +#ifdef SHOW_PERF +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + time_load_idx += tmp_time2 - tmp_time1; +#endif + ko = kopen(argv[optind + 1], &fd); if (ko == 0) { if (bwa_verbose >= 1) fprintf(stderr, "[E::%s] fail to open file `%s'.\n", __func__, argv[optind + 1]); @@ -430,8 +709,27 @@ int main_mem(int argc, char *argv[]) bwa_print_sam_hdr(aux.idx->bns, hdr_line); aux.actual_chunk_size = fixed_chunk_size > 0? fixed_chunk_size : opt->chunk_size * opt->n_threads; - kt_pipeline(no_mt_io? 1 : 2, process, &aux, 3); - //kt_pipeline(3, process, &aux, 3); + +#ifdef SHOW_PERF +#if USE_RDTSC + tmp_time1 = __rdtsc(); +#else + tmp_time1 = realtime_msec(); +#endif +#endif + + if (no_mt_io == 2) new_pipeline(&aux); + else kt_pipeline(no_mt_io? 1 : 2, process, &aux, 3); + +#ifdef SHOW_PERF +#if USE_RDTSC + tmp_time2 = __rdtsc(); +#else + tmp_time2 = realtime_msec(); +#endif + time_process_data += tmp_time2 - tmp_time1; +#endif + free(hdr_line); free(opt); bwa_idx_destroy(aux.idx); @@ -443,26 +741,60 @@ int main_mem(int argc, char *argv[]) } #ifdef SHOW_PERF - int64_t avg_seed_1 = 0, avg_seed_2 = 0, avg_seed_3 = 0, avg_sa = 0, avg_bsw = 0; + uint64_t avg_seed_1 = 0, avg_seed_2 = 0, avg_seed_3 = 0, + avg_sa = 0, avg_chain = 0, avg_seed_all = 0, + avg_bsw = 0, avg_bsw_all = 0; + uint64_t min_seed_all = UINT64_MAX, min_bsw_all = UINT64_MAX; + uint64_t max_seed_all = 0, max_bsw_all = 0; + for (i = 0; i < opt->n_threads; ++i) { avg_seed_1 += aux.w->aux[i]->time_seed_1; avg_seed_2 += aux.w->aux[i]->time_seed_2; avg_seed_3 += aux.w->aux[i]->time_seed_3; avg_sa += aux.w->aux[i]->time_sa; + avg_chain += aux.w->aux[i]->time_chain; + avg_seed_all += aux.w->aux[i]->time_seed_all; + avg_bsw += aux.w->aux[i]->time_bsw; + avg_bsw_all += aux.w->aux[i]->time_bsw_all; + + min_seed_all = MIN(min_seed_all, aux.w->aux[i]->time_seed_all); + min_bsw_all = MIN(min_bsw_all, aux.w->aux[i]->time_bsw_all); + max_seed_all = MAX(max_seed_all, aux.w->aux[i]->time_seed_all); + max_bsw_all = MAX(max_bsw_all, aux.w->aux[i]->time_bsw_all); } + fprintf(stderr, "\n"); - fprintf(stderr, "time_work1: %f s\n", time_work1 / 1000.0); - fprintf(stderr, "time_work2: %f s\n", time_work2 / 1000.0); - fprintf(stderr, "time_process_seq: %f s\n", (time_work1 + time_work2) / 1000.0); - fprintf(stderr, "time_seed_1: %f s\n", avg_seed_1 / 1000.0 / opt->n_threads); - fprintf(stderr, "time_seed_2: %f s\n", avg_seed_2 / 1000.0 / opt->n_threads); - fprintf(stderr, "time_seed_3: %f s\n", avg_seed_3 / 1000.0 / opt->n_threads); - fprintf(stderr, "time_bwt_sa: %f s\n", avg_sa / 1000.0 / opt->n_threads); - fprintf(stderr, "time_bsw: %f s\n", avg_bsw / 1000.0 / opt->n_threads); - fprintf(stderr, "full_match: %ld, all: %ld\n", s1n, s2n); + fprintf(stderr, "time_load_idx: %0.2lf s\n", time_load_idx * 1.0 / proc_freq); + fprintf(stderr, "time_process_data: %0.2lf s\n", time_process_data * 1.0 / proc_freq); + fprintf(stderr, "time_read: %0.2lf s\n", time_read * 1.0 / proc_freq); + fprintf(stderr, "time_compute: %0.2lf s\n", time_compute * 1.0 / proc_freq); + fprintf(stderr, "time_write: %0.2lf s\n", time_write * 1.0 / proc_freq); + + fprintf(stderr, "time_work_kernel: %0.2lf s\n", time_work_kernel * 1.0 / proc_freq); + fprintf(stderr, "time_work_sam: %0.2lf s\n", time_work_sam * 1.0 / proc_freq); + fprintf(stderr, "time_process_seq: %0.2lf s\n", (time_work_kernel + time_work_sam) * 1.0 / proc_freq); + + fprintf(stderr, "time_seed_all: %0.2lf (%0.2lf, %0.2lf) s\n", avg_seed_all * 1.0 / proc_freq / opt->n_threads, min_seed_all * 1.0 / proc_freq, max_seed_all * 1.0 / proc_freq); + fprintf(stderr, "time_seed_1: %0.2lf s\n", avg_seed_1 * 1.0 / proc_freq / opt->n_threads); + fprintf(stderr, "time_seed_2: %0.2lf s\n", avg_seed_2 * 1.0 / proc_freq / opt->n_threads); + fprintf(stderr, "time_seed_3: %0.2lf s\n", avg_seed_3 * 1.0 / proc_freq / opt->n_threads); + fprintf(stderr, "time_sa: %0.2lf s\n", avg_sa * 1.0 / proc_freq / opt->n_threads); + fprintf(stderr, "time_chain: %0.2lf s\n", avg_chain * 1.0 / proc_freq / opt->n_threads); + + fprintf(stderr, "time_bsw_all: %0.2lf (%0.2lf, %0.2lf) s\n", avg_bsw_all * 1.0 / proc_freq / opt->n_threads, min_bsw_all * 1.0 / proc_freq, max_bsw_all * 1.0 / proc_freq); + fprintf(stderr, "time_bsw: %0.2lf s\n", avg_bsw * 1.0 / proc_freq / opt->n_threads); fprintf(stderr, "\n"); +#endif + +#ifdef SHOW_DATA_PERF + fprintf(stderr, "\n"); + fprintf(stderr, "full_match: %ld, all: %ld\n", gdat[1], gdat[0]); + fprintf(stderr, "\n"); +#endif + +#ifdef DEBUG_OUTPUT fclose(gfp1); fclose(gfp2); fclose(gfp3); diff --git a/ksw_extend2_avx2.c b/ksw_extend2_avx2.c index 4279e26..c1b0ebe 100644 --- a/ksw_extend2_avx2.c +++ b/ksw_extend2_avx2.c @@ -10,8 +10,6 @@ extern FILE *gfp1, *gfp2, *gfp3; #endif -//#define DEBUG_SW_EXTEND 1 - #define NO_VAL -1 #ifdef __GNUC__ @@ -151,7 +149,7 @@ static const uint16_t h_vec_int_mask[SIMD_WIDTH][SIMD_WIDTH] = { max_vec = _mm256_max_epu16(max_vec, _mm256_alignr_epi8(max_vec, max_vec, 8)); \ max_vec = _mm256_max_epu16(max_vec, _mm256_permute2x128_si256(max_vec, max_vec, 0x01)); \ int16_t *maxVal = (int16_t*)&max_vec; \ - m = MAX(m,maxVal[0]); \ + m = MAX(m, maxVal[0]); /*用来解决与BSW结果不一样的第二种情况(上边界)*/ \ if (maxVal[0] > 0) { \ for(j=beg, i=iend; j<=end; j+=SIMD_WIDTH, i-=SIMD_WIDTH) { \ __m256i h2_vec = _mm256_loadu_si256((__m256i*) (&hA2[j])); \ @@ -200,8 +198,10 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度 buf_t *buf) // 之前已经开辟过的缓存 { //return ksw_extend2_origin(qlen, query, tlen, target, is_left, m, mat, o_del, e_del, o_ins, e_ins, w, end_bonus, zdrop, h0, _qle, _tle, _gtle, _gscore, _max_off); - + +#ifdef DEBUG_OUTPUT //fprintf(gfp1, "%d\n", qlen); +#endif if (qlen * a + h0 < 255) return ksw_extend2_avx2_u8(qlen, query, tlen, target, is_left, m, mat, o_del, e_del, o_ins, e_ins, a, b, w, end_bonus, zdrop, h0, _qle, _tle, _gtle, _gscore, _max_off, buf); int16_t *mA,*hA, *eA, *fA, *mA1, *mA2, *hA0, *hA1, *eA1, *fA1, *hA2, *eA2, *fA2; // hA0保存上上个col的H,其他的保存上个H E F M @@ -277,7 +277,7 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度 int m_last=0; int iend; -//#undef KSW_EQUAL + #ifdef KSW_EQUAL int midx = 1, icheck = 0, checkspecial = 1; int m3 = 0, m2 = 0, m1 = 0; @@ -375,6 +375,7 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度 SIMD_FIND_MAX; #ifdef KSW_EQUAL +// 用来解决与BSW结果不一样的第一种情况(左边界) #if 0 if (hA1[0] < b && checkspecial) { int mi; @@ -477,12 +478,12 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度 SWAP_DATA_POINTER; } +#ifdef DEBUG_OUTPUT #ifdef DEBUG_SW_EXTEND fprintf(gfp1, "qlen: %d, tlen: %d, h0: %d, w: %d, mi: %d, mj: %d, mie: %d, max_off: %d, score: %d, max: %d\n", qlen, tlen, h0, w, max_i + 1, max_j + 1, max_ie + 1, max_off, gscore, max); fprintf(gfp2, "qlen: %d, tlen: %d, h0: %d, w: %d, mi: %d, mj: %d, mie: %d, max_off: %d, score: %d, max: %d\n", qlen, tlen, h0, w, max_i + 1, max_j + 1, max_ie + 1, max_off, gscore, max); fprintf(gfp3, "qlen: %d, tlen: %d, h0: %d, w: %d, mi: %d, mj: %d, mie: %d, max_off: %d, score: %d, max: %d\n", qlen, tlen, h0, w, max_i + 1, max_j + 1, max_ie + 1, max_off, gscore, max); -#endif -#ifdef DEBUG_SW_EXTEND + fprintf(gfp1, "%-4d", -1); fprintf(gfp2, "%-4d", -1); fprintf(gfp3, "%-4d", -1); @@ -518,6 +519,7 @@ int ksw_extend2_avx2(int qlen, // query length 待匹配段碱基的query长度 fprintf(gfp2, "\n"); fprintf(gfp3, "\n"); } +#endif #endif // free(mem); if (_qle) *_qle = max_j + 1; @@ -621,7 +623,8 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长 } ins[0][0] = del[0][0] = score[0][0] = h0; #endif -#ifdef SHOW_PERF + +#ifdef DEBUG_OUTPUT // fprintf(gfp1, "start\n"); int bsw_cal_num = 0; int real_cal_num = 0; @@ -634,6 +637,7 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长 } // fprintf(gfp1, "%d\n", bsw_cal_num); #endif + for (i = 0; LIKELY(i < tlen); ++i) { int t, f = 0, h1, m = 0, mj = -1; int8_t *q = &qp[ref[i] * qlen]; @@ -646,10 +650,10 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长 h1 = h0 - (o_del + e_del * (i + 1)); if (h1 < 0) h1 = 0; } else h1 = 0; - //m = h1; + //m = h1; // 用来解决和VP-BSW结果不一样的第一种情况(左边界) for (j = beg; LIKELY(j < end); ++j) { -#ifdef SHOW_PERF +#ifdef DEBUG_OUTPUT real_cal_num++; #endif @@ -668,6 +672,7 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长 h = M > e? M : e; // e and f are guaranteed to be non-negative, so h>=0 even if M<0 h = h > f? h : f; h1 = h; // save H(i,j) to h1 for the next column + #ifdef DEBUG_SW_EXTEND score[i+1][j+1] = h; #endif @@ -677,6 +682,7 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长 t = t > 0? t : 0; e -= e_del; e = e > t? e : t; // computed E(i+1,j) + #ifdef DEBUG_SW_EXTEND del[i+1][j+1] = e; #endif @@ -713,12 +719,12 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长 // fprintf(stderr, "beg: %d; end: %d\n", beg, end); //} } +#ifdef DEBUG_OUTPUT #ifdef DEBUG_SW_EXTEND fprintf(gfp1, "qlen: %d, tlen: %d, h0: %d, w: %d, mi: %d, mj: %d, mie: %d, max_off: %d, score: %d, max: %d\n", qlen, tlen, h0, w, max_i + 1, max_j + 1, max_ie + 1, max_off, gscore, max); fprintf(gfp2, "qlen: %d, tlen: %d, h0: %d, w: %d, mi: %d, mj: %d, mie: %d, max_off: %d, score: %d, max: %d\n", qlen, tlen, h0, w, max_i + 1, max_j + 1, max_ie + 1, max_off, gscore, max); fprintf(gfp3, "qlen: %d, tlen: %d, h0: %d, w: %d, mi: %d, mj: %d, mie: %d, max_off: %d, score: %d, max: %d\n", qlen, tlen, h0, w, max_i + 1, max_j + 1, max_ie + 1, max_off, gscore, max); -#endif -#ifdef DEBUG_SW_EXTEND + fprintf(gfp1, "%-4d", -1); fprintf(gfp2, "%-4d", -1); fprintf(gfp3, "%-4d", -1); @@ -759,7 +765,9 @@ int ksw_extend2_origin(int qlen, // query length 待匹配段碱基的query长 fprintf(gfp3, "\n"); } #endif -#ifdef SHOW_PERF +#endif + +#ifdef DEBUG_OUTPUT //fprintf(gfp1, "%d\nend\n", real_cal_num); #endif diff --git a/run.sh b/run.sh index 7d755eb..64ce110 100755 --- a/run.sh +++ b/run.sh @@ -1,12 +1,16 @@ -thread=64 +thread=1 #n_r1=~/data/fastq/dataset/na12878_wgs_150/bn1.fq #n_r2=~/data/fastq/dataset/na12878_wgs_150/bn2.fq -n_r1=~/data/fastq/dataset/na12878_wes_144/SRR25735653_1.fastq -n_r2=~/data/fastq/dataset/na12878_wes_144/SRR25735653_2.fastq +#n_r1=~/data/fastq/dataset/na12878_wes_144/SRR25735653_1.fastq +#n_r2=~/data/fastq/dataset/na12878_wes_144/SRR25735653_2.fastq #n_r1=~/data/fastq/dataset/na12878_wgs_150/n1.fq #n_r2=~/data/fastq/dataset/na12878_wgs_150/n2.fq #n_r1=~/data/fastq/dataset/na12878_wgs_101/na_1.fq #n_r2=~/data/fastq/dataset/na12878_wgs_101/na_2.fq +n_r1=~/data/fastq/dataset/zy_wes/sn1.fq +n_r2=~/data/fastq/dataset/zy_wes/sn2.fq + + #n_r1=~/data/fastq/zy/wgs/n1.fq #n_r2=~/data/fastq/zy/wgs/n2.fq #n_r1=~/data/fastq/ds/n1.fq @@ -72,6 +76,6 @@ time ./bwa mem -t $thread -b 256 -M -R @RG\\tID:normal\\tSM:normal\\tPL:illumina $reference \ $n_r1 \ $n_r2 \ - -o $out + -o $out -2 diff --git a/utils.c b/utils.c index b957de8..a051c5a 100644 --- a/utils.c +++ b/utils.c @@ -310,11 +310,11 @@ double realtime(void) return tp.tv_sec + tp.tv_usec * 1e-6; } -int64_t realtime_msec(void) +uint64_t realtime_msec(void) { struct timeval tv; gettimeofday(&tv, NULL); - return (int64_t)1000 * (tv.tv_sec + ((1e-6) * tv.tv_usec)); + return (uint64_t)1000 * (tv.tv_sec + ((1e-6) * tv.tv_usec)); } long peakrss(void) diff --git a/utils.h b/utils.h index 6441a6a..1653f9b 100644 --- a/utils.h +++ b/utils.h @@ -31,31 +31,24 @@ #include #include +#define USE_RDTSC 0 + #ifdef SHOW_PERF +extern uint64_t time_process_data, time_read, time_write, time_compute, + time_seed_1, time_seed_2, time_seed_3, + time_seed_sa, time_seed_chain, time_seed_all, + time_bsw, time_bsw_all, + time_work_kernel, time_work_sam, + time_load_idx; +extern uint64_t proc_freq; +#endif -extern int64_t time_ksw_extend2, - time_ksw_global2, - time_ksw_align2, - time_bwt_smem1a, - time_seed_1, - time_seed_2, - time_seed_3, - time_fmt_smem_0, - time_bwt_extend, - time_bwt_occ4, - time_bwt_sa, - time_bwt_sa_read, - time_bns, -time_work1, - time_work2, - time_flt_chain; +#ifdef SHOW_DATA_PERF +extern int64_t gdat[100]; +#endif -extern int64_t gn[100]; -extern int64_t dn, n16, n17, n18, n19, nall, num_sa; -extern int64_t s1n, s2n, s3n, s1l, s2l, s3l; -extern int64_t g_num_smem2; +#ifdef DEBUG_OUTPUT extern FILE *gfp1, *gfp2, *gfp3; - #endif #undef MAX @@ -79,6 +72,24 @@ extern FILE *gfp1, *gfp2, *gfp3; #define xassert(cond, msg) if ((cond) == 0) _err_fatal_simple_core(__func__, msg) +#if defined(__GNUC__) && __GNUC__ < 11 && !defined(__clang__) +#if defined(__i386__) +static inline unsigned long long __rdtsc(void) +{ + unsigned long long int x; + __asm__ volatile(".byte 0x0f, 0x31" : "=A"(x)); + return x; +} +#elif defined(__x86_64__) +static inline unsigned long long __rdtsc(void) +{ + unsigned hi, lo; + __asm__ __volatile__("rdtsc" : "=a"(lo), "=d"(hi)); + return ((unsigned long long)lo) | (((unsigned long long)hi) << 32); +} +#endif +#endif + typedef struct { uint64_t x, y; } pair64_t; @@ -121,7 +132,7 @@ extern "C" { double cputime(void); double realtime(void); - int64_t realtime_msec(void); + uint64_t realtime_msec(void); long peakrss(void); void ks_introsort_64 (size_t n, uint64_t *a); diff --git a/yarn.c b/yarn.c new file mode 100644 index 0000000..254ba62 --- /dev/null +++ b/yarn.c @@ -0,0 +1,398 @@ +/* yarn.c -- generic thread operations implemented using pthread functions + * Copyright (C) 2008, 2011, 2012, 2015, 2018, 2019, 2020 Mark Adler + * Version 1.7 12 Apr 2020 Mark Adler + * For conditions of distribution and use, see copyright notice in yarn.h + */ + +/* Basic thread operations implemented using the POSIX pthread library. All + pthread references are isolated within this module to allow alternate + implementations with other thread libraries. See yarn.h for the description + of these operations. */ + +/* Version history: + 1.0 19 Oct 2008 First version + 1.1 26 Oct 2008 No need to set the stack size -- remove + Add yarn_abort() function for clean-up on error exit + 1.2 19 Dec 2011 (changes reversed in 1.3) + 1.3 13 Jan 2012 Add large file #define for consistency with pigz.c + Update thread portability #defines per IEEE 1003.1-2008 + Fix documentation in yarn.h for yarn_prefix + 1.4 19 Jan 2015 Allow yarn_abort() to avoid error message to stderr + Accept and do nothing for NULL argument to free_lock() + 1.5 8 May 2018 Remove destruct() to avoid use of pthread_cancel() + Normalize the code style + 1.6 3 Apr 2019 Add debugging information to fail() error messages + 1.7 12 Apr 2020 Fix use after free bug in ignition() + */ + +// For thread portability. +#define _XOPEN_SOURCE 700 +#define _POSIX_C_SOURCE 200809L +#define _THREAD_SAFE + +// Use large file functions if available. +#define _FILE_OFFSET_BITS 64 + +// External libraries and entities referenced. +#include // fprintf(), stderr +#include // exit(), malloc(), free(), NULL +#include // pthread_t, pthread_create(), pthread_join(), + // pthread_attr_t, pthread_attr_init(), pthread_attr_destroy(), + // PTHREAD_CREATE_JOINABLE, pthread_attr_setdetachstate(), + // pthread_self(), pthread_equal(), + // pthread_mutex_t, PTHREAD_MUTEX_INITIALIZER, pthread_mutex_init(), + // pthread_mutex_lock(), pthread_mutex_unlock(), pthread_mutex_destroy(), + // pthread_cond_t, PTHREAD_COND_INITIALIZER, pthread_cond_init(), + // pthread_cond_broadcast(), pthread_cond_wait(), pthread_cond_destroy() +#include // EPERM, ESRCH, EDEADLK, ENOMEM, EBUSY, EINVAL, EAGAIN + +// Interface definition. +#include "yarn.h" + +// Constants. +#define local static // for non-exported functions and globals + +// Error handling external globals, resettable by application. +char *yarn_prefix = (char*)"yarn"; +void (*yarn_abort)(int) = NULL; + + +// Immediately exit -- use for errors that shouldn't ever happen. +local void fail(int err, char const *file, long line, char const *func) { + fprintf(stderr, "%s: ", yarn_prefix); + switch (err) { + case EPERM: + fputs("already unlocked", stderr); + break; + case ESRCH: + fputs("no such thread", stderr); + break; + case EDEADLK: + fputs("resource deadlock", stderr); + break; + case ENOMEM: + fputs("out of memory", stderr); + break; + case EBUSY: + fputs("can't destroy locked resource", stderr); + break; + case EINVAL: + fputs("invalid request", stderr); + break; + case EAGAIN: + fputs("resource unavailable", stderr); + break; + default: + fprintf(stderr, "internal error %d", err); + } + fprintf(stderr, " (%s:%ld:%s)\n", file, line, func); + if (yarn_abort != NULL) + yarn_abort(err); + exit(err); +} + +// Memory handling routines provided by user. If none are provided, malloc() +// and free() are used, which are therefore assumed to be thread-safe. +typedef void *(*malloc_t)(size_t); +typedef void (*free_t)(void *); +local malloc_t my_malloc_f = malloc; +local free_t my_free = free; + +// Use user-supplied allocation routines instead of malloc() and free(). +void yarn_mem(malloc_t lease, free_t vacate) { + my_malloc_f = lease; + my_free = vacate; +} + +// Memory allocation that cannot fail (from the point of view of the caller). +local void *my_malloc(size_t size, char const *file, long line) { + void *block; + + if ((block = my_malloc_f(size)) == NULL) + fail(ENOMEM, file, line, "malloc"); + return block; +} + +// -- Lock functions -- + +struct lock_s { + pthread_mutex_t mutex; + pthread_cond_t cond; + long value; +}; + +lock_t *new_lock_(long initial, char const *file, long line) { + lock_t *bolt = (lock_t *)my_malloc(sizeof(struct lock_s), file, line); + int ret = pthread_mutex_init(&(bolt->mutex), NULL); + if (ret) + fail(ret, file, line, "mutex_init"); + ret = pthread_cond_init(&(bolt->cond), NULL); + if (ret) + fail(ret, file, line, "cond_init"); + bolt->value = initial; + return bolt; +} + +void possess_(lock_t *bolt, char const *file, long line) { + int ret = pthread_mutex_lock(&(bolt->mutex)); + if (ret) + fail(ret, file, line, "mutex_lock"); +} + +void release_(lock_t *bolt, char const *file, long line) { + int ret = pthread_mutex_unlock(&(bolt->mutex)); + if (ret) + fail(ret, file, line, "mutex_unlock"); +} + +void twist_(lock_t *bolt, enum twist_op op, long val, + char const *file, long line) { + if (op == TO) + bolt->value = val; + else if (op == BY) + bolt->value += val; + int ret = pthread_cond_broadcast(&(bolt->cond)); + if (ret) + fail(ret, file, line, "cond_broadcast"); + ret = pthread_mutex_unlock(&(bolt->mutex)); + if (ret) + fail(ret, file, line, "mutex_unlock"); +} + +#define until(a) while(!(a)) + +void wait_for_(lock_t *bolt, enum wait_op op, long val, + char const *file, long line) { + switch (op) { + case TO_BE: + until (bolt->value == val) { + int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex)); + if (ret) + fail(ret, file, line, "cond_wait"); + } + break; + case NOT_TO_BE: + until (bolt->value != val) { + int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex)); + if (ret) + fail(ret, file, line, "cond_wait"); + } + break; + case TO_BE_MORE_THAN: + until (bolt->value > val) { + int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex)); + if (ret) + fail(ret, file, line, "cond_wait"); + } + break; + case TO_BE_LESS_THAN: + until (bolt->value < val) { + int ret = pthread_cond_wait(&(bolt->cond), &(bolt->mutex)); + if (ret) + fail(ret, file, line, "cond_wait"); + } + } +} + +long peek_lock(lock_t *bolt) { + return bolt->value; +} + +void free_lock_(lock_t *bolt, char const *file, long line) { + if (bolt == NULL) + return; + int ret = pthread_cond_destroy(&(bolt->cond)); + if (ret) + fail(ret, file, line, "cond_destroy"); + ret = pthread_mutex_destroy(&(bolt->mutex)); + if (ret) + fail(ret, file, line, "mutex_destroy"); + my_free(bolt); +} + +// -- Thread functions (uses the lock_t functions above) -- + +struct thread_s { + pthread_t id; + int done; // true if this thread has exited + thread *next; // for list of all launched threads +}; + +// List of threads launched but not joined, count of threads exited but not +// joined (incremented by ignition() just before exiting). +local lock_t threads_lock = { + PTHREAD_MUTEX_INITIALIZER, + PTHREAD_COND_INITIALIZER, + 0 // number of threads exited but not joined +}; +local thread *threads = NULL; // list of extant threads + +// Structure in which to pass the probe and its payload to ignition(). +struct capsule { + void (*probe)(void *); + void *payload; + char const *file; + long line; +}; + +// Mark the calling thread as done and alert join_all(). +local void reenter(void *arg) { + struct capsule *capsule = (struct capsule *)arg; + + // find this thread in the threads list by matching the thread id + pthread_t me = pthread_self(); + possess_(&(threads_lock), capsule->file, capsule->line); + thread **prior = &(threads); + thread *match; + while ((match = *prior) != NULL) { + if (pthread_equal(match->id, me)) + break; + prior = &(match->next); + } + if (match == NULL) + fail(ESRCH, capsule->file, capsule->line, "reenter lost"); + + // mark this thread as done and move it to the head of the list + match->done = 1; + if (threads != match) { + *prior = match->next; + match->next = threads; + threads = match; + } + + // update the count of threads to be joined and alert join_all() + twist_(&(threads_lock), BY, +1, capsule->file, capsule->line); + + // free the capsule resource, even if the thread is cancelled (though yarn + // doesn't use pthread_cancel() -- you never know) + my_free(capsule); +} + +// All threads go through this routine. Just before a thread exits, it marks +// itself as done in the threads list and alerts join_all() so that the thread +// resources can be released. Use a cleanup stack so that the marking occurs +// even if the thread is cancelled. +local void *ignition(void *arg) { + struct capsule *capsule = (struct capsule *)arg; + + // run reenter() before leaving + pthread_cleanup_push(reenter, arg); + + // execute the requested function with argument + capsule->probe(capsule->payload); + + // mark this thread as done, letting join_all() know, and free capsule + pthread_cleanup_pop(1); + + // exit thread + return NULL; +} + +// Not all POSIX implementations create threads as joinable by default, so that +// is made explicit here. +thread *launch_(void (*probe)(void *), void *payload, + char const *file, long line) { + // construct the requested call and argument for the ignition() routine + // (allocated instead of automatic so that we're sure this will still be + // there when ignition() actually starts up -- ignition() will free this + // allocation) + struct capsule *capsule = (struct capsule *)my_malloc(sizeof(struct capsule), file, line); + capsule->probe = probe; + capsule->payload = payload; + capsule->file = file; + capsule->line = line; + + // assure this thread is in the list before join_all() or ignition() looks + // for it + possess_(&(threads_lock), file, line); + + // create the thread and call ignition() from that thread + thread *th = (thread *)my_malloc(sizeof(struct thread_s), file, line); + pthread_attr_t attr; + int ret = pthread_attr_init(&attr); + if (ret) + fail(ret, file, line, "attr_init"); + ret = pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); + if (ret) + fail(ret, file, line, "attr_setdetachstate"); + ret = pthread_create(&(th->id), &attr, ignition, capsule); + if (ret) + fail(ret, file, line, "create"); + ret = pthread_attr_destroy(&attr); + if (ret) + fail(ret, file, line, "attr_destroy"); + + // put the thread in the threads list for join_all() + th->done = 0; + th->next = threads; + threads = th; + release_(&(threads_lock), file, line); + return th; +} + +void join_(thread *ally, char const *file, long line) { + // wait for thread to exit and return its resources + int ret = pthread_join(ally->id, NULL); + if (ret) + fail(ret, file, line, "join"); + + // find the thread in the threads list + possess_(&(threads_lock), file, line); + thread **prior = &(threads); + thread *match; + while ((match = *prior) != NULL) { + if (match == ally) + break; + prior = &(match->next); + } + if (match == NULL) + fail(ESRCH, file, line, "join lost"); + + // remove thread from list and update exited count, free thread + if (match->done) + threads_lock.value--; + *prior = match->next; + release_(&(threads_lock), file, line); + my_free(ally); +} + +// This implementation of join_all() only attempts to join threads that have +// announced that they have exited (see ignition()). When there are many +// threads, this is faster than waiting for some random thread to exit while a +// bunch of other threads have already exited. +int join_all_(char const *file, long line) { + // grab the threads list and initialize the joined count + int count = 0; + possess_(&(threads_lock), file, line); + + // do until threads list is empty + while (threads != NULL) { + // wait until at least one thread has reentered + wait_for_(&(threads_lock), NOT_TO_BE, 0, file, line); + + // find the first thread marked done (should be at or near the top) + thread **prior = &(threads); + thread *match; + while ((match = *prior) != NULL) { + if (match->done) + break; + prior = &(match->next); + } + if (match == NULL) + fail(ESRCH, file, line, "join_all lost"); + + // join the thread (will be almost immediate), remove from the threads + // list, update the reenter count, and free the thread + int ret = pthread_join(match->id, NULL); + if (ret) + fail(ret, file, line, "join"); + threads_lock.value--; + *prior = match->next; + my_free(match); + count++; + } + + // let go of the threads list and return the number of threads joined + release_(&(threads_lock), file, line); + return count; +} \ No newline at end of file diff --git a/yarn.h b/yarn.h new file mode 100644 index 0000000..ab5d31e --- /dev/null +++ b/yarn.h @@ -0,0 +1,139 @@ +/* yarn.h -- generic interface for thread operations + * Copyright (C) 2008, 2011, 2012, 2015, 2018, 2019, 2020 Mark Adler + * Version 1.7 12 Apr 2020 Mark Adler + */ + +/* + This software is provided 'as-is', without any express or implied + warranty. In no event will the author be held liable for any damages + arising from the use of this software. + + Permission is granted to anyone to use this software for any purpose, + including commercial applications, and to alter it and redistribute it + freely, subject to the following restrictions: + + 1. The origin of this software must not be misrepresented; you must not + claim that you wrote the original software. If you use this software + in a product, an acknowledgment in the product documentation would be + appreciated but is not required. + 2. Altered source versions must be plainly marked as such, and must not be + misrepresented as being the original software. + 3. This notice may not be removed or altered from any source distribution. + + Mark Adler + madler@alumni.caltech.edu + */ + +/* Basic thread operations + + This interface isolates the local operating system implementation of threads + from the application in order to facilitate platform independent use of + threads. All of the implementation details are deliberately hidden. + + Assuming adequate system resources and proper use, none of these functions + can fail. As a result, any errors encountered will cause an exit() to be + executed, or the execution of your own optionally-provided abort function. + + These functions allow the simple launching and joining of threads, and the + locking of objects and synchronization of changes of objects. The latter is + implemented with a single lock_t type that contains an integer value. The + value can be ignored for simple exclusive access to an object, or the value + can be used to signal and wait for changes to an object. + + -- Arguments -- + + thread *thread; identifier for launched thread, used by join + void probe(void *); pointer to function "probe", run when thread starts + void *payload; single argument passed to the probe function + lock_t *lock_t; a lock_t with a value -- used for exclusive access to + an object and to synchronize threads waiting for + changes to an object + long val; value to set lock_t, increment lock_t, or wait for + int n; number of threads joined + + -- Thread functions -- + + thread = launch(probe, payload) - launch a thread -- exit via probe() return + join(thread) - join a thread and by joining end it, waiting for the thread + to exit if it hasn't already -- will free the resources allocated by + launch() (don't try to join the same thread more than once) + n = join_all() - join all threads launched by launch() that are not joined + yet and free the resources allocated by the launches, usually to clean + up when the thread processing is done -- join_all() returns an int with + the count of the number of threads joined (join_all() should only be + called from the main thread, and should only be called after any calls + of join() have completed) + + -- Lock functions -- + + lock_t = new_lock(val) - create a new lock_t with initial value val (lock_t is + created in the released state) + possess(lock_t) - acquire exclusive possession of a lock_t, waiting if necessary + twist(lock_t, [TO | BY], val) - set lock_t to or increment lock_t by val, signal + all threads waiting on this lock_t and then release the lock_t -- must + possess the lock_t before calling (twist releases, so don't do a + release() after a twist() on the same lock_t) + wait_for(lock_t, [TO_BE | NOT_TO_BE | TO_BE_MORE_THAN | TO_BE_LESS_THAN], val) + - wait on lock_t value to be, not to be, be greater than, or be less than + val -- must possess the lock_t before calling, will possess the lock_t on + return but the lock_t is released while waiting to permit other threads + to use twist() to change the value and signal the change (so make sure + that the object is in a usable state when waiting) + release(lock_t) - release a possessed lock_t (do not try to release a lock_t that + the current thread does not possess) + val = peek_lock(lock_t) - return the value of the lock_t (assumes that lock_t is + already possessed, no possess or release is done by peek_lock()) + free_lock(lock_t) - free the resources allocated by new_lock() (application + must assure that the lock_t is released before calling free_lock()) + + -- Memory allocation --- + + yarn_mem(better_malloc, better_free) - set the memory allocation and free + routines for use by the yarn routines where the supplied routines have + the same interface and operation as malloc() and free(), and may be + provided in order to supply thread-safe memory allocation routines or + for any other reason -- by default malloc() and free() will be used + + -- Error control -- + + yarn_prefix - a char pointer to a string that will be the prefix for any + error messages that these routines generate before exiting -- if not + changed by the application, "yarn" will be used + yarn_abort - an external function that will be executed when there is an + internal yarn error, due to out of memory or misuse -- this function + may exit to abort the application, or if it returns, the yarn error + handler will exit (set to NULL by default for no action) + */ + +extern char *yarn_prefix; +extern void (*yarn_abort)(int); + +void yarn_mem(void *(*)(size_t), void (*)(void *)); + +typedef struct thread_s thread; +thread *launch_(void (*)(void *), void *, char const *, long); +#define LAUNCH(a, b) launch_(a, b, __FILE__, __LINE__) +void join_(thread *, char const *, long); +#define JOIN(a) join_(a, __FILE__, __LINE__) +int join_all_(char const *, long); +#define JOIN_ALL() join_all_(__FILE__, __LINE__) + +typedef struct lock_s lock_t; +lock_t *new_lock_(long, char const *, long); +#define NEW_LOCK(a) new_lock_(a, __FILE__, __LINE__) +void possess_(lock_t *, char const *, long); +#define POSSESS(a) possess_(a, __FILE__, __LINE__) +void release_(lock_t *, char const *, long); +// #define release(a) release_(a, __FILE__, __LINE__) +#define RELEASE(a) release_(a, __FILE__, __LINE__) +enum twist_op { TO, BY }; +void twist_(lock_t *, enum twist_op, long, char const *, long); +#define TWIST(a, b, c) twist_(a, b, c, __FILE__, __LINE__) +enum wait_op { + TO_BE, /* or */ NOT_TO_BE, /* that is the question */ + TO_BE_MORE_THAN, TO_BE_LESS_THAN }; +void wait_for_(lock_t *, enum wait_op, long, char const *, long); +#define WAIT_FOR(a, b, c) wait_for_(a, b, c, __FILE__, __LINE__) +long peek_lock(lock_t *); +void free_lock_(lock_t *, char const *, long); +#define FREE_LOCK(a) free_lock_(a, __FILE__, __LINE__)