use kthread for multi-threading

Bwa-mem should have better performance with many CPU cores.
This commit is contained in:
Heng Li 2013-11-02 12:13:11 -04:00
parent c564653b40
commit 19d33faa30
3 changed files with 74 additions and 55 deletions

View File

@ -4,7 +4,7 @@ CFLAGS= -g -Wall -O2
WRAP_MALLOC=-DUSE_MALLOC_WRAPPERS
AR= ar
DFLAGS= -DHAVE_PTHREAD $(WRAP_MALLOC)
LOBJS= utils.o kstring.o ksw.o bwt.o bntseq.o bwa.o bwamem.o bwamem_pair.o malloc_wrap.o
LOBJS= utils.o kthread.o kstring.o ksw.o bwt.o bntseq.o bwa.o bwamem.o bwamem_pair.o malloc_wrap.o
AOBJS= QSufSort.o bwt_gen.o bwase.o bwaseqio.o bwtgap.o bwtaln.o bamlite.o \
is.o bwtindex.o bwape.o kopen.o pemerge.o \
bwtsw2_core.o bwtsw2_main.o bwtsw2_aux.o bwt_lite.o \

View File

@ -952,7 +952,6 @@ mem_aln_t mem_reg2aln(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *
}
typedef struct {
int start, step, n;
const mem_opt_t *opt;
const bwt_t *bwt;
const bntseq_t *bns;
@ -962,84 +961,51 @@ typedef struct {
mem_alnreg_v *regs;
} worker_t;
static void *worker1(void *data)
static void worker1(void *data, int i, int tid)
{
worker_t *w = (worker_t*)data;
int i;
if (!(w->opt->flag&MEM_F_PE)) {
for (i = w->start; i < w->n; i += w->step)
w->regs[i] = mem_align1_core(w->opt, w->bwt, w->bns, w->pac, w->seqs[i].l_seq, w->seqs[i].seq);
} else { // for PE we align the two ends in the same thread in case the 2nd read is of worse quality, in which case some threads may be faster/slower
for (i = w->start; i < w->n>>1; i += w->step) {
w->regs[i<<1|0] = mem_align1_core(w->opt, w->bwt, w->bns, w->pac, w->seqs[i<<1|0].l_seq, w->seqs[i<<1|0].seq);
w->regs[i<<1|1] = mem_align1_core(w->opt, w->bwt, w->bns, w->pac, w->seqs[i<<1|1].l_seq, w->seqs[i<<1|1].seq);
}
w->regs[i] = mem_align1_core(w->opt, w->bwt, w->bns, w->pac, w->seqs[i].l_seq, w->seqs[i].seq);
} else {
w->regs[i<<1|0] = mem_align1_core(w->opt, w->bwt, w->bns, w->pac, w->seqs[i<<1|0].l_seq, w->seqs[i<<1|0].seq);
w->regs[i<<1|1] = mem_align1_core(w->opt, w->bwt, w->bns, w->pac, w->seqs[i<<1|1].l_seq, w->seqs[i<<1|1].seq);
}
return 0;
}
static void *worker2(void *data)
static void worker2(void *data, int i, int tid)
{
extern int mem_sam_pe(const mem_opt_t *opt, const bntseq_t *bns, const uint8_t *pac, const mem_pestat_t pes[4], uint64_t id, bseq1_t s[2], mem_alnreg_v a[2]);
worker_t *w = (worker_t*)data;
int i;
if (!(w->opt->flag&MEM_F_PE)) {
for (i = w->start; i < w->n; i += w->step) {
mem_mark_primary_se(w->opt, w->regs[i].n, w->regs[i].a);
mem_reg2sam_se(w->opt, w->bns, w->pac, &w->seqs[i], &w->regs[i], 0, 0);
free(w->regs[i].a);
}
mem_mark_primary_se(w->opt, w->regs[i].n, w->regs[i].a);
mem_reg2sam_se(w->opt, w->bns, w->pac, &w->seqs[i], &w->regs[i], 0, 0);
free(w->regs[i].a);
} else {
int n = 0;
for (i = w->start; i < w->n>>1; i += w->step) { // not implemented yet
n += mem_sam_pe(w->opt, w->bns, w->pac, w->pes, i, &w->seqs[i<<1], &w->regs[i<<1]);
free(w->regs[i<<1|0].a); free(w->regs[i<<1|1].a);
}
fprintf(stderr, "[M::%s@%d] performed mate-SW for %d reads\n", __func__, w->start, n);
mem_sam_pe(w->opt, w->bns, w->pac, w->pes, i, &w->seqs[i<<1], &w->regs[i<<1]);
free(w->regs[i<<1|0].a); free(w->regs[i<<1|1].a);
}
return 0;
}
void mem_process_seqs(const mem_opt_t *opt, const bwt_t *bwt, const bntseq_t *bns, const uint8_t *pac, int n, bseq1_t *seqs, const mem_pestat_t *pes0)
{
extern void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n);
int i;
worker_t *w;
worker_t w;
mem_alnreg_v *regs;
mem_pestat_t pes[4];
w = calloc(opt->n_threads, sizeof(worker_t));
regs = malloc(n * sizeof(mem_alnreg_v));
for (i = 0; i < opt->n_threads; ++i) {
worker_t *p = &w[i];
p->start = i; p->step = opt->n_threads; p->n = n;
worker_t *p = &w;
p->opt = opt; p->bwt = bwt; p->bns = bns; p->pac = pac;
p->seqs = seqs; p->regs = regs;
p->pes = &pes[0];
}
#ifdef HAVE_PTHREAD
if (opt->n_threads == 1) {
#endif
worker1(w);
if (opt->flag&MEM_F_PE) { // paired-end mode
if (pes0) memcpy(pes, pes0, 4 * sizeof(mem_pestat_t)); // if pes0 != NULL, set the insert-size distribution as pes0
else mem_pestat(opt, bns->l_pac, n, regs, pes); // otherwise, infer the insert size distribution from data
}
worker2(w);
#ifdef HAVE_PTHREAD
} else {
pthread_t *tid;
tid = (pthread_t*)calloc(opt->n_threads, sizeof(pthread_t));
for (i = 0; i < opt->n_threads; ++i) pthread_create(&tid[i], 0, worker1, &w[i]);
for (i = 0; i < opt->n_threads; ++i) pthread_join(tid[i], 0);
if (opt->flag&MEM_F_PE) {
if (pes0) memcpy(pes, pes0, 4 * sizeof(mem_pestat_t));
else mem_pestat(opt, bns->l_pac, n, regs, pes);
}
for (i = 0; i < opt->n_threads; ++i) pthread_create(&tid[i], 0, worker2, &w[i]);
for (i = 0; i < opt->n_threads; ++i) pthread_join(tid[i], 0);
free(tid);
kt_for(opt->n_threads, worker1, &w, (opt->flag&MEM_F_PE)? n>>1 : n); // find mapping positions
if (opt->flag&MEM_F_PE) { // infer insert sizes if not provided
if (pes0) memcpy(pes, pes0, 4 * sizeof(mem_pestat_t)); // if pes0 != NULL, set the insert-size distribution as pes0
else mem_pestat(opt, bns->l_pac, n, regs, pes); // otherwise, infer the insert size distribution from data
}
#endif
free(regs); free(w);
kt_for(opt->n_threads, worker2, &w, (opt->flag&MEM_F_PE)? n>>1 : n); // generate alignment
free(regs);
}

53
kthread.c 100644
View File

@ -0,0 +1,53 @@
#include <pthread.h>
#include <stdlib.h>
struct kt_for_t;
typedef struct {
struct kt_for_t *t;
int i;
} ktf_worker_t;
typedef struct kt_for_t {
int n_threads, n;
ktf_worker_t *w;
void (*func)(void*,int,int);
void *data;
} kt_for_t;
static inline int steal_work(kt_for_t *t)
{
int i, k, min = 0x7fffffff, min_i = -1;
for (i = 0; i < t->n_threads; ++i)
if (min > t->w[i].i) min = t->w[i].i, min_i = i;
k = __sync_fetch_and_add(&t->w[min_i].i, t->n_threads);
return k >= t->n? -1 : k;
}
static void *ktf_worker(void *data)
{
ktf_worker_t *w = (ktf_worker_t*)data;
int i;
for (;;) {
i = __sync_fetch_and_add(&w->i, w->t->n_threads);
if (i >= w->t->n) break;
w->t->func(w->t->data, i, w - w->t->w);
}
while ((i = steal_work(w->t)) >= 0)
w->t->func(w->t->data, i, w - w->t->w);
pthread_exit(0);
}
void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n)
{
int i;
kt_for_t t;
pthread_t *tid;
t.func = func, t.data = data, t.n_threads = n_threads, t.n = n;
t.w = (ktf_worker_t*)alloca(n_threads * sizeof(ktf_worker_t));
tid = (pthread_t*)alloca(n_threads * sizeof(pthread_t));
for (i = 0; i < n_threads; ++i)
t.w[i].t = &t, t.w[i].i = i;
for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktf_worker, &t.w[i]);
for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0);
}