r1017: process I/O in a separate thread
Previously, bwa-mem waits for I/O. When the input data comes from a slow source (I/O or piped from a slow program), bwa-mem may spend significant amount of wall-clock time in the single-thread mode. The same may also happen when bwa-mem writes to slow target. This commit uses two sequence buffers. it allows bwa-mem to map one buffer while filling or dumping the other buffer. When bwa-mem is run on 16 threads using the bwa.kit pipeline, the wall clock time is reduced by 30%.
This commit is contained in:
parent
3633436a3d
commit
9d8c906e4c
163
fastmap.c
163
fastmap.c
|
|
@ -18,6 +18,83 @@ extern unsigned char nst_nt4_table[256];
|
|||
|
||||
void *kopen(const char *fn, int *_fd);
|
||||
int kclose(void *a);
|
||||
void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_data, int n_steps);
|
||||
|
||||
typedef struct {
|
||||
kseq_t *ks, *ks2;
|
||||
mem_opt_t *opt;
|
||||
mem_pestat_t *pes0;
|
||||
int64_t n_processed;
|
||||
int copy_comment, actual_chunk_size;
|
||||
bwaidx_t *idx;
|
||||
} ktp_aux_t;
|
||||
|
||||
typedef struct {
|
||||
ktp_aux_t *aux;
|
||||
int n_seqs;
|
||||
bseq1_t *seqs;
|
||||
} ktp_data_t;
|
||||
|
||||
static void *process(void *shared, int step, void *_data)
|
||||
{
|
||||
ktp_aux_t *aux = (ktp_aux_t*)shared;
|
||||
ktp_data_t *data = (ktp_data_t*)_data;
|
||||
int i;
|
||||
if (step == 0) {
|
||||
ktp_data_t *ret;
|
||||
int64_t size = 0;
|
||||
ret = calloc(1, sizeof(ktp_data_t));
|
||||
ret->seqs = bseq_read(aux->actual_chunk_size, &ret->n_seqs, aux->ks, aux->ks2);
|
||||
if (ret->seqs == 0) {
|
||||
free(ret);
|
||||
return 0;
|
||||
}
|
||||
if (!aux->copy_comment)
|
||||
for (i = 0; i < ret->n_seqs; ++i) {
|
||||
free(ret->seqs[i].comment);
|
||||
ret->seqs[i].comment = 0;
|
||||
}
|
||||
for (i = 0; i < ret->n_seqs; ++i) size += ret->seqs[i].l_seq;
|
||||
if (bwa_verbose >= 3)
|
||||
fprintf(stderr, "[M::%s] read %d sequences (%ld bp)...\n", __func__, ret->n_seqs, (long)size);
|
||||
return ret;
|
||||
} else if (step == 1) {
|
||||
const mem_opt_t *opt = aux->opt;
|
||||
const bwaidx_t *idx = aux->idx;
|
||||
if (opt->flag & MEM_F_SMARTPE) {
|
||||
bseq1_t *sep[2];
|
||||
int n_sep[2];
|
||||
mem_opt_t tmp_opt = *opt;
|
||||
bseq_classify(data->n_seqs, data->seqs, n_sep, sep);
|
||||
if (bwa_verbose >= 3)
|
||||
fprintf(stderr, "[M::%s] %d single-end sequences; %d paired-end sequences\n", __func__, n_sep[0], n_sep[1]);
|
||||
if (n_sep[0]) {
|
||||
tmp_opt.flag &= ~MEM_F_PE;
|
||||
mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, aux->n_processed, n_sep[0], sep[0], 0);
|
||||
for (i = 0; i < n_sep[0]; ++i)
|
||||
data->seqs[sep[0][i].id].sam = sep[0][i].sam;
|
||||
}
|
||||
if (n_sep[1]) {
|
||||
tmp_opt.flag |= MEM_F_PE;
|
||||
mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, aux->n_processed + n_sep[0], n_sep[1], sep[1], aux->pes0);
|
||||
for (i = 0; i < n_sep[1]; ++i)
|
||||
data->seqs[sep[1][i].id].sam = sep[1][i].sam;
|
||||
}
|
||||
free(sep[0]); free(sep[1]);
|
||||
} else mem_process_seqs(opt, idx->bwt, idx->bns, idx->pac, aux->n_processed, data->n_seqs, data->seqs, aux->pes0);
|
||||
aux->n_processed += data->n_seqs;
|
||||
return data;
|
||||
} else if (step == 2) {
|
||||
for (i = 0; i < data->n_seqs; ++i) {
|
||||
if (data->seqs[i].sam) err_fputs(data->seqs[i].sam, stdout);
|
||||
free(data->seqs[i].name); free(data->seqs[i].comment);
|
||||
free(data->seqs[i].seq); free(data->seqs[i].qual); free(data->seqs[i].sam);
|
||||
}
|
||||
free(data->seqs); free(data);
|
||||
return 0;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void update_a(mem_opt_t *opt, const mem_opt_t *opt0)
|
||||
{
|
||||
|
|
@ -38,25 +115,24 @@ static void update_a(mem_opt_t *opt, const mem_opt_t *opt0)
|
|||
int main_mem(int argc, char *argv[])
|
||||
{
|
||||
mem_opt_t *opt, opt0;
|
||||
int fd, fd2, i, c, n, copy_comment = 0, ignore_alt = 0;
|
||||
int fixed_chunk_size = -1, actual_chunk_size;
|
||||
int fd, fd2, i, c, ignore_alt = 0, no_mt_io = 0;
|
||||
int fixed_chunk_size = -1;
|
||||
gzFile fp, fp2 = 0;
|
||||
kseq_t *ks, *ks2 = 0;
|
||||
bseq1_t *seqs;
|
||||
bwaidx_t *idx;
|
||||
char *p, *rg_line = 0, *hdr_line = 0;
|
||||
const char *mode = 0;
|
||||
void *ko = 0, *ko2 = 0;
|
||||
int64_t n_processed = 0;
|
||||
mem_pestat_t pes[4], *pes0 = 0;
|
||||
mem_pestat_t pes[4];
|
||||
ktp_aux_t aux;
|
||||
|
||||
memset(&aux, 0, sizeof(ktp_aux_t));
|
||||
memset(pes, 0, 4 * sizeof(mem_pestat_t));
|
||||
for (i = 0; i < 4; ++i) pes[i].failed = 1;
|
||||
|
||||
opt = mem_opt_init();
|
||||
aux.opt = opt = mem_opt_init();
|
||||
memset(&opt0, 0, sizeof(mem_opt_t));
|
||||
while ((c = getopt(argc, argv, "epaFMCSPVYjk:c:v:s:r:t:R:A:B:O:E:U:w:L:d:T:Q:D:m:I:N:W:x:G:h:y:K:X:H:")) >= 0) {
|
||||
while ((c = getopt(argc, argv, "1epaFMCSPVYjk:c:v:s:r:t:R:A:B:O:E:U:w:L:d:T:Q:D:m:I:N:W:x:G:h:y:K:X:H:")) >= 0) {
|
||||
if (c == 'k') opt->min_seed_len = atoi(optarg), opt0.min_seed_len = 1;
|
||||
else if (c == '1') no_mt_io = 1;
|
||||
else if (c == 'x') mode = optarg;
|
||||
else if (c == 'w') opt->w = atoi(optarg), opt0.w = 1;
|
||||
else if (c == 'A') opt->a = atoi(optarg), opt0.a = 1;
|
||||
|
|
@ -85,7 +161,7 @@ int main_mem(int argc, char *argv[])
|
|||
else if (c == 'N') opt->max_chain_extend = atoi(optarg), opt0.max_chain_extend = 1;
|
||||
else if (c == 'W') opt->min_chain_weight = atoi(optarg), opt0.min_chain_weight = 1;
|
||||
else if (c == 'y') opt->max_mem_intv = atol(optarg), opt0.max_mem_intv = 1;
|
||||
else if (c == 'C') copy_comment = 1;
|
||||
else if (c == 'C') aux.copy_comment = 1;
|
||||
else if (c == 'K') fixed_chunk_size = atoi(optarg);
|
||||
else if (c == 'X') opt->mask_level = atof(optarg);
|
||||
else if (c == 'h') {
|
||||
|
|
@ -118,7 +194,7 @@ int main_mem(int argc, char *argv[])
|
|||
} else if (c == 'H') {
|
||||
hdr_line = bwa_insert_header(optarg, hdr_line);
|
||||
} else if (c == 'I') { // specify the insert size distribution
|
||||
pes0 = pes;
|
||||
aux.pes0 = pes;
|
||||
pes[1].failed = 0;
|
||||
pes[1].avg = strtod(optarg, &p);
|
||||
pes[1].std = pes[1].avg * .1;
|
||||
|
|
@ -238,14 +314,14 @@ int main_mem(int argc, char *argv[])
|
|||
} else update_a(opt, &opt0);
|
||||
bwa_fill_scmat(opt->a, opt->b, opt->mat);
|
||||
|
||||
idx = bwa_idx_load_from_shm(argv[optind]);
|
||||
if (idx == 0) {
|
||||
if ((idx = bwa_idx_load(argv[optind], BWA_IDX_ALL)) == 0) return 1; // FIXME: memory leak
|
||||
aux.idx = bwa_idx_load_from_shm(argv[optind]);
|
||||
if (aux.idx == 0) {
|
||||
if ((aux.idx = bwa_idx_load(argv[optind], BWA_IDX_ALL)) == 0) return 1; // FIXME: memory leak
|
||||
} else if (bwa_verbose >= 3)
|
||||
fprintf(stderr, "[M::%s] load the bwa index from shared memory\n", __func__);
|
||||
if (ignore_alt)
|
||||
for (i = 0; i < idx->bns->n_seqs; ++i)
|
||||
idx->bns->anns[i].is_alt = 0;
|
||||
for (i = 0; i < aux.idx->bns->n_seqs; ++i)
|
||||
aux.idx->bns->anns[i].is_alt = 0;
|
||||
|
||||
ko = kopen(argv[optind + 1], &fd);
|
||||
if (ko == 0) {
|
||||
|
|
@ -253,7 +329,7 @@ int main_mem(int argc, char *argv[])
|
|||
return 1;
|
||||
}
|
||||
fp = gzdopen(fd, "r");
|
||||
ks = kseq_init(fp);
|
||||
aux.ks = kseq_init(fp);
|
||||
if (optind + 2 < argc) {
|
||||
if (opt->flag&MEM_F_PE) {
|
||||
if (bwa_verbose >= 2)
|
||||
|
|
@ -265,58 +341,21 @@ int main_mem(int argc, char *argv[])
|
|||
return 1;
|
||||
}
|
||||
fp2 = gzdopen(fd2, "r");
|
||||
ks2 = kseq_init(fp2);
|
||||
aux.ks2 = kseq_init(fp2);
|
||||
opt->flag |= MEM_F_PE;
|
||||
}
|
||||
}
|
||||
if (!(opt->flag & MEM_F_ALN_REG))
|
||||
bwa_print_sam_hdr(idx->bns, hdr_line);
|
||||
actual_chunk_size = fixed_chunk_size > 0? fixed_chunk_size : opt->chunk_size * opt->n_threads;
|
||||
while ((seqs = bseq_read(actual_chunk_size, &n, ks, ks2)) != 0) {
|
||||
int64_t size = 0;
|
||||
if (!copy_comment)
|
||||
for (i = 0; i < n; ++i) {
|
||||
free(seqs[i].comment); seqs[i].comment = 0;
|
||||
}
|
||||
for (i = 0; i < n; ++i) size += seqs[i].l_seq;
|
||||
if (bwa_verbose >= 3)
|
||||
fprintf(stderr, "[M::%s] read %d sequences (%ld bp)...\n", __func__, n, (long)size);
|
||||
if (opt->flag & MEM_F_SMARTPE) {
|
||||
bseq1_t *sep[2];
|
||||
int i, n_sep[2];
|
||||
mem_opt_t tmp_opt = *opt;
|
||||
bseq_classify(n, seqs, n_sep, sep);
|
||||
if (bwa_verbose >= 3)
|
||||
fprintf(stderr, "[M::%s] %d single-end sequences; %d paired-end sequences\n", __func__, n_sep[0], n_sep[1]);
|
||||
if (n_sep[0]) {
|
||||
tmp_opt.flag &= ~MEM_F_PE;
|
||||
mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, n_processed, n_sep[0], sep[0], 0);
|
||||
for (i = 0; i < n_sep[0]; ++i)
|
||||
seqs[sep[0][i].id].sam = sep[0][i].sam;
|
||||
}
|
||||
if (n_sep[1]) {
|
||||
tmp_opt.flag |= MEM_F_PE;
|
||||
mem_process_seqs(&tmp_opt, idx->bwt, idx->bns, idx->pac, n_processed + n_sep[0], n_sep[1], sep[1], pes0);
|
||||
for (i = 0; i < n_sep[1]; ++i)
|
||||
seqs[sep[1][i].id].sam = sep[1][i].sam;
|
||||
}
|
||||
free(sep[0]); free(sep[1]);
|
||||
} else mem_process_seqs(opt, idx->bwt, idx->bns, idx->pac, n_processed, n, seqs, pes0);
|
||||
n_processed += n;
|
||||
for (i = 0; i < n; ++i) {
|
||||
if (seqs[i].sam) err_fputs(seqs[i].sam, stdout);
|
||||
free(seqs[i].name); free(seqs[i].comment); free(seqs[i].seq); free(seqs[i].qual); free(seqs[i].sam);
|
||||
}
|
||||
free(seqs);
|
||||
}
|
||||
|
||||
bwa_print_sam_hdr(aux.idx->bns, hdr_line);
|
||||
aux.actual_chunk_size = fixed_chunk_size > 0? fixed_chunk_size : opt->chunk_size * opt->n_threads;
|
||||
kt_pipeline(no_mt_io? 1 : 2, process, &aux, 3);
|
||||
free(hdr_line);
|
||||
free(opt);
|
||||
bwa_idx_destroy(idx);
|
||||
kseq_destroy(ks);
|
||||
bwa_idx_destroy(aux.idx);
|
||||
kseq_destroy(aux.ks);
|
||||
err_gzclose(fp); kclose(ko);
|
||||
if (ks2) {
|
||||
kseq_destroy(ks2);
|
||||
if (aux.ks2) {
|
||||
kseq_destroy(aux.ks2);
|
||||
err_gzclose(fp2); kclose(ko2);
|
||||
}
|
||||
return 0;
|
||||
|
|
|
|||
104
kthread.c
104
kthread.c
|
|
@ -1,23 +1,30 @@
|
|||
#include <pthread.h>
|
||||
#include <stdlib.h>
|
||||
#include <limits.h>
|
||||
|
||||
/************
|
||||
* kt_for() *
|
||||
************/
|
||||
|
||||
struct kt_for_t;
|
||||
|
||||
typedef struct {
|
||||
struct kt_for_t *t;
|
||||
int i;
|
||||
long i;
|
||||
} ktf_worker_t;
|
||||
|
||||
typedef struct kt_for_t {
|
||||
int n_threads, n;
|
||||
int n_threads;
|
||||
long n;
|
||||
ktf_worker_t *w;
|
||||
void (*func)(void*,int,int);
|
||||
void (*func)(void*,long,int);
|
||||
void *data;
|
||||
} kt_for_t;
|
||||
|
||||
static inline int steal_work(kt_for_t *t)
|
||||
static inline long steal_work(kt_for_t *t)
|
||||
{
|
||||
int i, k, min = 0x7fffffff, min_i = -1;
|
||||
int i, min_i = -1;
|
||||
long k, min = LONG_MAX;
|
||||
for (i = 0; i < t->n_threads; ++i)
|
||||
if (min > t->w[i].i) min = t->w[i].i, min_i = i;
|
||||
k = __sync_fetch_and_add(&t->w[min_i].i, t->n_threads);
|
||||
|
|
@ -27,7 +34,7 @@ static inline int steal_work(kt_for_t *t)
|
|||
static void *ktf_worker(void *data)
|
||||
{
|
||||
ktf_worker_t *w = (ktf_worker_t*)data;
|
||||
int i;
|
||||
long i;
|
||||
for (;;) {
|
||||
i = __sync_fetch_and_add(&w->i, w->t->n_threads);
|
||||
if (i >= w->t->n) break;
|
||||
|
|
@ -38,7 +45,7 @@ static void *ktf_worker(void *data)
|
|||
pthread_exit(0);
|
||||
}
|
||||
|
||||
void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n)
|
||||
void kt_for(int n_threads, void (*func)(void*,long,int), void *data, long n)
|
||||
{
|
||||
int i;
|
||||
kt_for_t t;
|
||||
|
|
@ -51,3 +58,86 @@ void kt_for(int n_threads, void (*func)(void*,int,int), void *data, int n)
|
|||
for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktf_worker, &t.w[i]);
|
||||
for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0);
|
||||
}
|
||||
|
||||
/*****************
|
||||
* kt_pipeline() *
|
||||
*****************/
|
||||
|
||||
struct ktp_t;
|
||||
|
||||
typedef struct {
|
||||
struct ktp_t *pl;
|
||||
int step, running;
|
||||
void *data;
|
||||
} ktp_worker_t;
|
||||
|
||||
typedef struct ktp_t {
|
||||
void *shared;
|
||||
void *(*func)(void*, int, void*);
|
||||
int n_workers, n_steps;
|
||||
ktp_worker_t *workers;
|
||||
pthread_mutex_t mutex;
|
||||
pthread_cond_t cv;
|
||||
} ktp_t;
|
||||
|
||||
static void *ktp_worker(void *data)
|
||||
{
|
||||
ktp_worker_t *w = (ktp_worker_t*)data;
|
||||
ktp_t *p = w->pl;
|
||||
while (w->step < p->n_steps) {
|
||||
// test whether we can kick off the job with this worker
|
||||
pthread_mutex_lock(&p->mutex);
|
||||
for (;;) {
|
||||
int i;
|
||||
// test whether another worker is doing the same step
|
||||
for (i = 0; i < p->n_workers; ++i) {
|
||||
if (w == &p->workers[i]) continue; // ignore itself
|
||||
if (p->workers[i].running && p->workers[i].step == w->step)
|
||||
break;
|
||||
}
|
||||
if (i == p->n_workers) break; // no other workers doing w->step; then this worker will
|
||||
pthread_cond_wait(&p->cv, &p->mutex);
|
||||
}
|
||||
w->running = 1;
|
||||
pthread_mutex_unlock(&p->mutex);
|
||||
|
||||
// working on w->step
|
||||
w->data = p->func(p->shared, w->step, w->step? w->data : 0); // for the first step, input is NULL
|
||||
|
||||
// update step and let other workers know
|
||||
pthread_mutex_lock(&p->mutex);
|
||||
w->step = w->step == p->n_steps - 1 || w->data? (w->step + 1) % p->n_steps : p->n_steps;
|
||||
w->running = 0;
|
||||
pthread_cond_broadcast(&p->cv);
|
||||
pthread_mutex_unlock(&p->mutex);
|
||||
}
|
||||
pthread_exit(0);
|
||||
}
|
||||
|
||||
void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_data, int n_steps)
|
||||
{
|
||||
ktp_t aux;
|
||||
pthread_t *tid;
|
||||
int i;
|
||||
|
||||
if (n_threads < 1) n_threads = 1;
|
||||
aux.n_workers = n_threads;
|
||||
aux.n_steps = n_steps;
|
||||
aux.func = func;
|
||||
aux.shared = shared_data;
|
||||
pthread_mutex_init(&aux.mutex, 0);
|
||||
pthread_cond_init(&aux.cv, 0);
|
||||
|
||||
aux.workers = alloca(n_threads * sizeof(ktp_worker_t));
|
||||
for (i = 0; i < n_threads; ++i) {
|
||||
ktp_worker_t *w = &aux.workers[i];
|
||||
w->step = w->running = 0; w->pl = &aux; w->data = 0;
|
||||
}
|
||||
|
||||
tid = alloca(n_threads * sizeof(pthread_t));
|
||||
for (i = 0; i < n_threads; ++i) pthread_create(&tid[i], 0, ktp_worker, &aux.workers[i]);
|
||||
for (i = 0; i < n_threads; ++i) pthread_join(tid[i], 0);
|
||||
|
||||
pthread_mutex_destroy(&aux.mutex);
|
||||
pthread_cond_destroy(&aux.cv);
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue