From 9d8c906e4c2de93c240a8d27d59e77c493d17c57 Mon Sep 17 00:00:00 2001 From: Heng Li Date: Wed, 26 Nov 2014 12:32:48 -0500 Subject: [PATCH] r1017: process I/O in a separate thread Previously, bwa-mem waits for I/O. When the input data comes from a slow source (I/O or piped from a slow program), bwa-mem may spend significant amount of wall-clock time in the single-thread mode. The same may also happen when bwa-mem writes to slow target. This commit uses two sequence buffers. it allows bwa-mem to map one buffer while filling or dumping the other buffer. When bwa-mem is run on 16 threads using the bwa.kit pipeline, the wall clock time is reduced by 30%. --- fastmap.c | 163 +++++++++++++++++++++++++++++++++--------------------- kthread.c | 104 +++++++++++++++++++++++++++++++--- main.c | 2 +- 3 files changed, 199 insertions(+), 70 deletions(-) diff --git a/fastmap.c b/fastmap.c index 312b1bc..537fff7 100644 --- a/fastmap.c +++ b/fastmap.c @@ -18,6 +18,83 @@ extern unsigned char nst_nt4_table[256]; void *kopen(const char *fn, int *_fd); int kclose(void *a); +void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_data, int n_steps); + +typedef struct { + kseq_t *ks, *ks2; + mem_opt_t *opt; + mem_pestat_t *pes0; + int64_t n_processed; + int copy_comment, actual_chunk_size; + bwaidx_t *idx; +} ktp_aux_t; + +typedef struct { + ktp_aux_t *aux; + int n_seqs; + bseq1_t *seqs; +} ktp_data_t; + +static void *process(void *shared, int step, void *_data) +{ + ktp_aux_t *aux = (ktp_aux_t*)shared; + ktp_data_t *data = (ktp_data_t*)_data; + int i; + if (step == 0) { + ktp_data_t *ret; + int64_t size = 0; + ret = calloc(1, sizeof(ktp_data_t)); + ret->seqs = bseq_read(aux->actual_chunk_size, &ret->n_seqs, aux->ks, aux->ks2); + if (ret->seqs == 0) { + free(ret); + return 0; + } + if (!aux->copy_comment) + for (i = 0; i < ret->n_seqs; ++i) { + free(ret->seqs[i].comment); + ret->seqs[i].comment = 0; + } + for (i = 0; i < ret->n_seqs; ++i) size += ret->seqs[i].l_seq; + if (bwa_verbose >= 3) + fprintf(stderr, "[M::%s] read %d sequences (%ld bp)...\n", __func__, ret->n_seqs, (long)size); + return ret; + } else if (step == 1) { + const mem_opt_t *opt = aux->opt; + const bwaidx_t *idx = aux->idx; + if (opt->flag & MEM_F_SMARTPE) { + bseq1_t *sep[2]; + int n_sep[2]; + mem_opt_t tmp_opt = *opt; + bseq_classify(data->n_seqs, data->seqs, n_sep, sep); + if (bwa_verbose >= 3) + fprintf(stderr, "[M::%s] %d single-end sequences; %d paired-end sequences\n", __func__, n_sep[0], n_sep[1]); + if (n_sep[0]) { + tmp_opt.flag &= ~MEM_F_PE; + mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, aux->n_processed, n_sep[0], sep[0], 0); + for (i = 0; i < n_sep[0]; ++i) + data->seqs[sep[0][i].id].sam = sep[0][i].sam; + } + if (n_sep[1]) { + tmp_opt.flag |= MEM_F_PE; + mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, aux->n_processed + n_sep[0], n_sep[1], sep[1], aux->pes0); + for (i = 0; i < n_sep[1]; ++i) + data->seqs[sep[1][i].id].sam = sep[1][i].sam; + } + free(sep[0]); free(sep[1]); + } else mem_process_seqs(opt, idx->bwt, idx->bns, idx->pac, aux->n_processed, data->n_seqs, data->seqs, aux->pes0); + aux->n_processed += data->n_seqs; + return data; + } else if (step == 2) { + for (i = 0; i < data->n_seqs; ++i) { + if (data->seqs[i].sam) err_fputs(data->seqs[i].sam, stdout); + free(data->seqs[i].name); free(data->seqs[i].comment); + free(data->seqs[i].seq); free(data->seqs[i].qual); free(data->seqs[i].sam); + } + free(data->seqs); free(data); + return 0; + } + return 0; +} static void update_a(mem_opt_t *opt, const mem_opt_t *opt0) { @@ -38,25 +115,24 @@ static void update_a(mem_opt_t *opt, const mem_opt_t *opt0) int main_mem(int argc, char *argv[]) { mem_opt_t *opt, opt0; - int fd, fd2, i, c, n, copy_comment = 0, ignore_alt = 0; - int fixed_chunk_size = -1, actual_chunk_size; + int fd, fd2, i, c, ignore_alt = 0, no_mt_io = 0; + int fixed_chunk_size = -1; gzFile fp, fp2 = 0; - kseq_t *ks, *ks2 = 0; - bseq1_t *seqs; - bwaidx_t *idx; char *p, *rg_line = 0, *hdr_line = 0; const char *mode = 0; void *ko = 0, *ko2 = 0; - int64_t n_processed = 0; - mem_pestat_t pes[4], *pes0 = 0; + mem_pestat_t pes[4]; + ktp_aux_t aux; + memset(&aux, 0, sizeof(ktp_aux_t)); memset(pes, 0, 4 * sizeof(mem_pestat_t)); for (i = 0; i < 4; ++i) pes[i].failed = 1; - opt = mem_opt_init(); + aux.opt = opt = mem_opt_init(); memset(&opt0, 0, sizeof(mem_opt_t)); - while ((c = getopt(argc, argv, "epaFMCSPVYjk:c:v:s:r:t:R:A:B:O:E:U:w:L:d:T:Q:D:m:I:N:W:x:G:h:y:K:X:H:")) >= 0) { + while ((c = getopt(argc, argv, "1epaFMCSPVYjk:c:v:s:r:t:R:A:B:O:E:U:w:L:d:T:Q:D:m:I:N:W:x:G:h:y:K:X:H:")) >= 0) { if (c == 'k') opt->min_seed_len = atoi(optarg), opt0.min_seed_len = 1; + else if (c == '1') no_mt_io = 1; else if (c == 'x') mode = optarg; else if (c == 'w') opt->w = atoi(optarg), opt0.w = 1; else if (c == 'A') opt->a = atoi(optarg), opt0.a = 1; @@ -85,7 +161,7 @@ int main_mem(int argc, char *argv[]) else if (c == 'N') opt->max_chain_extend = atoi(optarg), opt0.max_chain_extend = 1; else if (c == 'W') opt->min_chain_weight = atoi(optarg), opt0.min_chain_weight = 1; else if (c == 'y') opt->max_mem_intv = atol(optarg), opt0.max_mem_intv = 1; - else if (c == 'C') copy_comment = 1; + else if (c == 'C') aux.copy_comment = 1; else if (c == 'K') fixed_chunk_size = atoi(optarg); else if (c == 'X') opt->mask_level = atof(optarg); else if (c == 'h') { @@ -118,7 +194,7 @@ int main_mem(int argc, char *argv[]) } else if (c == 'H') { hdr_line = bwa_insert_header(optarg, hdr_line); } else if (c == 'I') { // specify the insert size distribution - pes0 = pes; + aux.pes0 = pes; pes[1].failed = 0; pes[1].avg = strtod(optarg, &p); pes[1].std = pes[1].avg * .1; @@ -238,14 +314,14 @@ int main_mem(int argc, char *argv[]) } else update_a(opt, &opt0); bwa_fill_scmat(opt->a, opt->b, opt->mat); - idx = bwa_idx_load_from_shm(argv[optind]); - if (idx == 0) { - if ((idx = bwa_idx_load(argv[optind], BWA_IDX_ALL)) == 0) return 1; // FIXME: memory leak + aux.idx = bwa_idx_load_from_shm(argv[optind]); + if (aux.idx == 0) { + if ((aux.idx = bwa_idx_load(argv[optind], BWA_IDX_ALL)) == 0) return 1; // FIXME: memory leak } else if (bwa_verbose >= 3) fprintf(stderr, "[M::%s] load the bwa index from shared memory\n", __func__); if (ignore_alt) - for (i = 0; i < idx->bns->n_seqs; ++i) - idx->bns->anns[i].is_alt = 0; + for (i = 0; i < aux.idx->bns->n_seqs; ++i) + aux.idx->bns->anns[i].is_alt = 0; ko = kopen(argv[optind + 1], &fd); if (ko == 0) { @@ -253,7 +329,7 @@ int main_mem(int argc, char *argv[]) return 1; } fp = gzdopen(fd, "r"); - ks = kseq_init(fp); + aux.ks = kseq_init(fp); if (optind + 2 < argc) { if (opt->flag&MEM_F_PE) { if (bwa_verbose >= 2) @@ -265,58 +341,21 @@ int main_mem(int argc, char *argv[]) return 1; } fp2 = gzdopen(fd2, "r"); - ks2 = kseq_init(fp2); + aux.ks2 = kseq_init(fp2); opt->flag |= MEM_F_PE; } } if (!(opt->flag & MEM_F_ALN_REG)) - bwa_print_sam_hdr(idx->bns, hdr_line); - actual_chunk_size = fixed_chunk_size > 0? fixed_chunk_size : opt->chunk_size * opt->n_threads; - while ((seqs = bseq_read(actual_chunk_size, &n, ks, ks2)) != 0) { - int64_t size = 0; - if (!copy_comment) - for (i = 0; i < n; ++i) { - free(seqs[i].comment); seqs[i].comment = 0; - } - for (i = 0; i < n; ++i) size += seqs[i].l_seq; - if (bwa_verbose >= 3) - fprintf(stderr, "[M::%s] read %d sequences (%ld bp)...\n", __func__, n, (long)size); - if (opt->flag & MEM_F_SMARTPE) { - bseq1_t *sep[2]; - int i, n_sep[2]; - mem_opt_t tmp_opt = *opt; - bseq_classify(n, seqs, n_sep, sep); - if (bwa_verbose >= 3) - fprintf(stderr, "[M::%s] %d single-end sequences; %d paired-end sequences\n", __func__, n_sep[0], n_sep[1]); - if (n_sep[0]) { - tmp_opt.flag &= ~MEM_F_PE; - mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, n_processed, n_sep[0], sep[0], 0); - for (i = 0; i < n_sep[0]; ++i) - seqs[sep[0][i].id].sam = sep[0][i].sam; - } - if (n_sep[1]) { - tmp_opt.flag |= MEM_F_PE; - mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, n_processed + n_sep[0], n_sep[1], sep[1], pes0); - for (i = 0; i < n_sep[1]; ++i) - seqs[sep[1][i].id].sam = sep[1][i].sam; - } - free(sep[0]); free(sep[1]); - } else mem_process_seqs(opt, idx->bwt, idx->bns, idx->pac, n_processed, n, seqs, pes0); - n_processed += n; - for (i = 0; i < n; ++i) { - if (seqs[i].sam) err_fputs(seqs[i].sam, stdout); - free(seqs[i].name); free(seqs[i].comment); free(seqs[i].seq); free(seqs[i].qual); free(seqs[i].sam); - } - free(seqs); - } - + bwa_print_sam_hdr(aux.idx->bns, hdr_line); + aux.actual_chunk_size = fixed_chunk_size > 0? fixed_chunk_size : opt->chunk_size * opt->n_threads; + kt_pipeline(no_mt_io? 1 : 2, process, &aux, 3); free(hdr_line); free(opt); - bwa_idx_destroy(idx); - kseq_destroy(ks); + bwa_idx_destroy(aux.idx); + kseq_destroy(aux.ks); err_gzclose(fp); kclose(ko); - if (ks2) { - kseq_destroy(ks2); + if (aux.ks2) { + kseq_destroy(aux.ks2); err_gzclose(fp2); kclose(ko2); } return 0; diff --git a/kthread.c b/kthread.c index a44426b..80f84cb 100644 --- a/kthread.c +++ b/kthread.c @@ -1,23 +1,30 @@ #include #include +#include + +/************ + * kt_for() * + ************/ struct kt_for_t; typedef struct { struct kt_for_t *t; - int i; + long i; } ktf_worker_t; typedef struct kt_for_t { - int n_threads, n; + int n_threads; + long n; ktf_worker_t *w; - void (*func)(void*,int,int); + void (*func)(void*,long,int); void *data; } kt_for_t; -static inline int steal_work(kt_for_t *t) +static inline long steal_work(kt_for_t *t) { - int i, k, min = 0x7fffffff, min_i = -1; + int i, min_i = -1; + long k, min = LONG_MAX; for (i = 0; i < t->n_threads; ++i) if (min > t->w[i].i) min = t->w[i].i, min_i = i; k = __sync_fetch_and_add(&t->w[min_i].i, t->n_threads); @@ -27,7 +34,7 @@ static inline int steal_work(kt_for_t *t) static void *ktf_worker(void *data) { ktf_worker_t *w = (ktf_worker_t*)data; - int i; + long i; for (;;) { i = __sync_fetch_and_add(&w->i, w->t->n_threads); if (i >= w->t->n) break; @@ -38,7 +45,7 @@ static void *ktf_worker(void *data) pthread_exit(0); } -void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n) +void kt_for(int n_threads, void (*func)(void*,long,int), void *data, long n) { int i; kt_for_t t; @@ -51,3 +58,86 @@ void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n) for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktf_worker, &t.w[i]); for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0); } + +/***************** + * kt_pipeline() * + *****************/ + +struct ktp_t; + +typedef struct { + struct ktp_t *pl; + int step, running; + void *data; +} ktp_worker_t; + +typedef struct ktp_t { + void *shared; + void *(*func)(void*, int, void*); + int n_workers, n_steps; + ktp_worker_t *workers; + pthread_mutex_t mutex; + pthread_cond_t cv; +} ktp_t; + +static void *ktp_worker(void *data) +{ + ktp_worker_t *w = (ktp_worker_t*)data; + ktp_t *p = w->pl; + while (w->step < p->n_steps) { + // test whether we can kick off the job with this worker + pthread_mutex_lock(&p->mutex); + for (;;) { + int i; + // test whether another worker is doing the same step + for (i = 0; i < p->n_workers; ++i) { + if (w == &p->workers[i]) continue; // ignore itself + if (p->workers[i].running && p->workers[i].step == w->step) + break; + } + if (i == p->n_workers) break; // no other workers doing w->step; then this worker will + pthread_cond_wait(&p->cv, &p->mutex); + } + w->running = 1; + pthread_mutex_unlock(&p->mutex); + + // working on w->step + w->data = p->func(p->shared, w->step, w->step? w->data : 0); // for the first step, input is NULL + + // update step and let other workers know + pthread_mutex_lock(&p->mutex); + w->step = w->step == p->n_steps - 1 || w->data? (w->step + 1) % p->n_steps : p->n_steps; + w->running = 0; + pthread_cond_broadcast(&p->cv); + pthread_mutex_unlock(&p->mutex); + } + pthread_exit(0); +} + +void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_data, int n_steps) +{ + ktp_t aux; + pthread_t *tid; + int i; + + if (n_threads < 1) n_threads = 1; + aux.n_workers = n_threads; + aux.n_steps = n_steps; + aux.func = func; + aux.shared = shared_data; + pthread_mutex_init(&aux.mutex, 0); + pthread_cond_init(&aux.cv, 0); + + aux.workers = alloca(n_threads * sizeof(ktp_worker_t)); + for (i = 0; i < n_threads; ++i) { + ktp_worker_t *w = &aux.workers[i]; + w->step = w->running = 0; w->pl = &aux; w->data = 0; + } + + tid = alloca(n_threads * sizeof(pthread_t)); + for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktp_worker, &aux.workers[i]); + for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0); + + pthread_mutex_destroy(&aux.mutex); + pthread_cond_destroy(&aux.cv); +} diff --git a/main.c b/main.c index d7ffb13..8216811 100644 --- a/main.c +++ b/main.c @@ -4,7 +4,7 @@ #include "utils.h" #ifndef PACKAGE_VERSION -#define PACKAGE_VERSION "0.7.10-r1005-dirty" +#define PACKAGE_VERSION "0.7.10-r1017-dirty" #endif int bwa_fa2pac(int argc, char *argv[]);