diff --git a/Makefile b/Makefile index 69b2ccd..ff48a20 100644 --- a/Makefile +++ b/Makefile @@ -4,7 +4,7 @@ CFLAGS= -g -Wall -O2 WRAP_MALLOC=-DUSE_MALLOC_WRAPPERS AR= ar DFLAGS= -DHAVE_PTHREAD $(WRAP_MALLOC) -LOBJS= utils.o kstring.o ksw.o bwt.o bntseq.o bwa.o bwamem.o bwamem_pair.o malloc_wrap.o +LOBJS= utils.o kthread.o kstring.o ksw.o bwt.o bntseq.o bwa.o bwamem.o bwamem_pair.o malloc_wrap.o AOBJS= QSufSort.o bwt_gen.o bwase.o bwaseqio.o bwtgap.o bwtaln.o bamlite.o \ is.o bwtindex.o bwape.o kopen.o pemerge.o \ bwtsw2_core.o bwtsw2_main.o bwtsw2_aux.o bwt_lite.o \ diff --git a/bwamem.c b/bwamem.c index 293dd55..61fa0dd 100644 --- a/bwamem.c +++ b/bwamem.c @@ -952,7 +952,6 @@ mem_aln_t mem_reg2aln(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t * } typedef struct { - int start, step, n; const mem_opt_t *opt; const bwt_t *bwt; const bntseq_t *bns; @@ -962,84 +961,51 @@ typedef struct { mem_alnreg_v *regs; } worker_t; -static void *worker1(void *data) +static void worker1(void *data, int i, int tid) { worker_t *w = (worker_t*)data; - int i; if (!(w->opt->flag&MEM_F_PE)) { - for (i = w->start; i < w->n; i += w->step) - w->regs[i] = mem_align1_core(w->opt, w->bwt, w->bns, w->pac, w->seqs[i].l_seq, w->seqs[i].seq); - } else { // for PE we align the two ends in the same thread in case the 2nd read is of worse quality, in which case some threads may be faster/slower - for (i = w->start; i < w->n>>1; i += w->step) { - w->regs[i<<1|0] = mem_align1_core(w->opt, w->bwt, w->bns, w->pac, w->seqs[i<<1|0].l_seq, w->seqs[i<<1|0].seq); - w->regs[i<<1|1] = mem_align1_core(w->opt, w->bwt, w->bns, w->pac, w->seqs[i<<1|1].l_seq, w->seqs[i<<1|1].seq); - } + w->regs[i] = mem_align1_core(w->opt, w->bwt, w->bns, w->pac, w->seqs[i].l_seq, w->seqs[i].seq); + } else { + w->regs[i<<1|0] = mem_align1_core(w->opt, w->bwt, w->bns, w->pac, w->seqs[i<<1|0].l_seq, w->seqs[i<<1|0].seq); + w->regs[i<<1|1] = mem_align1_core(w->opt, w->bwt, w->bns, w->pac, w->seqs[i<<1|1].l_seq, w->seqs[i<<1|1].seq); } - return 0; } -static void *worker2(void *data) +static void worker2(void *data, int i, int tid) { extern int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2]); worker_t *w = (worker_t*)data; - int i; if (!(w->opt->flag&MEM_F_PE)) { - for (i = w->start; i < w->n; i += w->step) { - mem_mark_primary_se(w->opt, w->regs[i].n, w->regs[i].a); - mem_reg2sam_se(w->opt, w->bns, w->pac, &w->seqs[i], &w->regs[i], 0, 0); - free(w->regs[i].a); - } + mem_mark_primary_se(w->opt, w->regs[i].n, w->regs[i].a); + mem_reg2sam_se(w->opt, w->bns, w->pac, &w->seqs[i], &w->regs[i], 0, 0); + free(w->regs[i].a); } else { - int n = 0; - for (i = w->start; i < w->n>>1; i += w->step) { // not implemented yet - n += mem_sam_pe(w->opt, w->bns, w->pac, w->pes, i, &w->seqs[i<<1], &w->regs[i<<1]); - free(w->regs[i<<1|0].a); free(w->regs[i<<1|1].a); - } - fprintf(stderr, "[M::%s@%d] performed mate-SW for %d reads\n", __func__, w->start, n); + mem_sam_pe(w->opt, w->bns, w->pac, w->pes, i, &w->seqs[i<<1], &w->regs[i<<1]); + free(w->regs[i<<1|0].a); free(w->regs[i<<1|1].a); } - return 0; } void mem_process_seqs(const mem_opt_t *opt, const bwt_t *bwt, const bntseq_t *bns, const uint8_t *pac, int n, bseq1_t *seqs, const mem_pestat_t *pes0) { + extern void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n); int i; - worker_t *w; + worker_t w; mem_alnreg_v *regs; mem_pestat_t pes[4]; - w = calloc(opt->n_threads, sizeof(worker_t)); regs = malloc(n * sizeof(mem_alnreg_v)); for (i = 0; i < opt->n_threads; ++i) { - worker_t *p = &w[i]; - p->start = i; p->step = opt->n_threads; p->n = n; + worker_t *p = &w; p->opt = opt; p->bwt = bwt; p->bns = bns; p->pac = pac; p->seqs = seqs; p->regs = regs; p->pes = &pes[0]; } - -#ifdef HAVE_PTHREAD - if (opt->n_threads == 1) { -#endif - worker1(w); - if (opt->flag&MEM_F_PE) { // paired-end mode - if (pes0) memcpy(pes, pes0, 4 * sizeof(mem_pestat_t)); // if pes0 != NULL, set the insert-size distribution as pes0 - else mem_pestat(opt, bns->l_pac, n, regs, pes); // otherwise, infer the insert size distribution from data - } - worker2(w); -#ifdef HAVE_PTHREAD - } else { - pthread_t *tid; - tid = (pthread_t*)calloc(opt->n_threads, sizeof(pthread_t)); - for (i = 0; i < opt->n_threads; ++i) pthread_create(&tid[i], 0, worker1, &w[i]); - for (i = 0; i < opt->n_threads; ++i) pthread_join(tid[i], 0); - if (opt->flag&MEM_F_PE) { - if (pes0) memcpy(pes, pes0, 4 * sizeof(mem_pestat_t)); - else mem_pestat(opt, bns->l_pac, n, regs, pes); - } - for (i = 0; i < opt->n_threads; ++i) pthread_create(&tid[i], 0, worker2, &w[i]); - for (i = 0; i < opt->n_threads; ++i) pthread_join(tid[i], 0); - free(tid); + kt_for(opt->n_threads, worker1, &w, (opt->flag&MEM_F_PE)? n>>1 : n); // find mapping positions + if (opt->flag&MEM_F_PE) { // infer insert sizes if not provided + if (pes0) memcpy(pes, pes0, 4 * sizeof(mem_pestat_t)); // if pes0 != NULL, set the insert-size distribution as pes0 + else mem_pestat(opt, bns->l_pac, n, regs, pes); // otherwise, infer the insert size distribution from data } -#endif - free(regs); free(w); + kt_for(opt->n_threads, worker2, &w, (opt->flag&MEM_F_PE)? n>>1 : n); // generate alignment + free(regs); } diff --git a/kthread.c b/kthread.c new file mode 100644 index 0000000..a44426b --- /dev/null +++ b/kthread.c @@ -0,0 +1,53 @@ +#include +#include + +struct kt_for_t; + +typedef struct { + struct kt_for_t *t; + int i; +} ktf_worker_t; + +typedef struct kt_for_t { + int n_threads, n; + ktf_worker_t *w; + void (*func)(void*,int,int); + void *data; +} kt_for_t; + +static inline int steal_work(kt_for_t *t) +{ + int i, k, min = 0x7fffffff, min_i = -1; + for (i = 0; i < t->n_threads; ++i) + if (min > t->w[i].i) min = t->w[i].i, min_i = i; + k = __sync_fetch_and_add(&t->w[min_i].i, t->n_threads); + return k >= t->n? -1 : k; +} + +static void *ktf_worker(void *data) +{ + ktf_worker_t *w = (ktf_worker_t*)data; + int i; + for (;;) { + i = __sync_fetch_and_add(&w->i, w->t->n_threads); + if (i >= w->t->n) break; + w->t->func(w->t->data, i, w - w->t->w); + } + while ((i = steal_work(w->t)) >= 0) + w->t->func(w->t->data, i, w - w->t->w); + pthread_exit(0); +} + +void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n) +{ + int i; + kt_for_t t; + pthread_t *tid; + t.func = func, t.data = data, t.n_threads = n_threads, t.n = n; + t.w = (ktf_worker_t*)alloca(n_threads * sizeof(ktf_worker_t)); + tid = (pthread_t*)alloca(n_threads * sizeof(pthread_t)); + for (i = 0; i < n_threads; ++i) + t.w[i].t = &t, t.w[i].i = i; + for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktf_worker, &t.w[i]); + for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0); +}