r1044: fixed an out-of-order bug in kthread
This commit is contained in:
parent
163f949a33
commit
3ae437ac13
15
kthread.c
15
kthread.c
|
|
@ -67,13 +67,15 @@ struct ktp_t;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
struct ktp_t *pl;
|
struct ktp_t *pl;
|
||||||
int step, running;
|
int64_t index;
|
||||||
|
int step;
|
||||||
void *data;
|
void *data;
|
||||||
} ktp_worker_t;
|
} ktp_worker_t;
|
||||||
|
|
||||||
typedef struct ktp_t {
|
typedef struct ktp_t {
|
||||||
void *shared;
|
void *shared;
|
||||||
void *(*func)(void*, int, void*);
|
void *(*func)(void*, int, void*);
|
||||||
|
int64_t index;
|
||||||
int n_workers, n_steps;
|
int n_workers, n_steps;
|
||||||
ktp_worker_t *workers;
|
ktp_worker_t *workers;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
|
|
@ -92,13 +94,12 @@ static void *ktp_worker(void *data)
|
||||||
// test whether another worker is doing the same step
|
// test whether another worker is doing the same step
|
||||||
for (i = 0; i < p->n_workers; ++i) {
|
for (i = 0; i < p->n_workers; ++i) {
|
||||||
if (w == &p->workers[i]) continue; // ignore itself
|
if (w == &p->workers[i]) continue; // ignore itself
|
||||||
if (p->workers[i].running && p->workers[i].step == w->step)
|
if (p->workers[i].step <= w->step && p->workers[i].index < w->index)
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (i == p->n_workers) break; // no other workers doing w->step; then this worker will
|
if (i == p->n_workers) break; // no workers with smaller indices are doing w->step or the previous steps
|
||||||
pthread_cond_wait(&p->cv, &p->mutex);
|
pthread_cond_wait(&p->cv, &p->mutex);
|
||||||
}
|
}
|
||||||
w->running = 1;
|
|
||||||
pthread_mutex_unlock(&p->mutex);
|
pthread_mutex_unlock(&p->mutex);
|
||||||
|
|
||||||
// working on w->step
|
// working on w->step
|
||||||
|
|
@ -107,7 +108,7 @@ static void *ktp_worker(void *data)
|
||||||
// update step and let other workers know
|
// update step and let other workers know
|
||||||
pthread_mutex_lock(&p->mutex);
|
pthread_mutex_lock(&p->mutex);
|
||||||
w->step = w->step == p->n_steps - 1 || w->data? (w->step + 1) % p->n_steps : p->n_steps;
|
w->step = w->step == p->n_steps - 1 || w->data? (w->step + 1) % p->n_steps : p->n_steps;
|
||||||
w->running = 0;
|
if (w->step == 0) w->index = p->index++;
|
||||||
pthread_cond_broadcast(&p->cv);
|
pthread_cond_broadcast(&p->cv);
|
||||||
pthread_mutex_unlock(&p->mutex);
|
pthread_mutex_unlock(&p->mutex);
|
||||||
}
|
}
|
||||||
|
|
@ -125,13 +126,15 @@ void kt_pipeline(int n_threads, void *(*func)(void*, int, void*), void *shared_d
|
||||||
aux.n_steps = n_steps;
|
aux.n_steps = n_steps;
|
||||||
aux.func = func;
|
aux.func = func;
|
||||||
aux.shared = shared_data;
|
aux.shared = shared_data;
|
||||||
|
aux.index = 0;
|
||||||
pthread_mutex_init(&aux.mutex, 0);
|
pthread_mutex_init(&aux.mutex, 0);
|
||||||
pthread_cond_init(&aux.cv, 0);
|
pthread_cond_init(&aux.cv, 0);
|
||||||
|
|
||||||
aux.workers = alloca(n_threads * sizeof(ktp_worker_t));
|
aux.workers = alloca(n_threads * sizeof(ktp_worker_t));
|
||||||
for (i = 0; i < n_threads; ++i) {
|
for (i = 0; i < n_threads; ++i) {
|
||||||
ktp_worker_t *w = &aux.workers[i];
|
ktp_worker_t *w = &aux.workers[i];
|
||||||
w->step = w->running = 0; w->pl = &aux; w->data = 0;
|
w->step = 0; w->pl = &aux; w->data = 0;
|
||||||
|
w->index = aux.index++;
|
||||||
}
|
}
|
||||||
|
|
||||||
tid = alloca(n_threads * sizeof(pthread_t));
|
tid = alloca(n_threads * sizeof(pthread_t));
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue