1536 lines
45 KiB
C
1536 lines
45 KiB
C
|
|
/* thread_pool.c -- A pool of generic worker threads
|
||
|
|
|
||
|
|
Copyright (c) 2013-2020 Genome Research Ltd.
|
||
|
|
|
||
|
|
Author: James Bonfield <jkb@sanger.ac.uk>
|
||
|
|
|
||
|
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||
|
|
of this software and associated documentation files (the "Software"), to deal
|
||
|
|
in the Software without restriction, including without limitation the rights
|
||
|
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||
|
|
copies of the Software, and to permit persons to whom the Software is
|
||
|
|
furnished to do so, subject to the following conditions:
|
||
|
|
|
||
|
|
The above copyright notice and this permission notice shall be included in
|
||
|
|
all copies or substantial portions of the Software.
|
||
|
|
|
||
|
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||
|
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||
|
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
|
||
|
|
THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||
|
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||
|
|
FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||
|
|
DEALINGS IN THE SOFTWARE. */
|
||
|
|
|
||
|
|
#ifndef TEST_MAIN
|
||
|
|
#define HTS_BUILDING_LIBRARY // Enables HTSLIB_EXPORT, see htslib/hts_defs.h
|
||
|
|
#include <config.h>
|
||
|
|
#endif
|
||
|
|
|
||
|
|
#include <stdlib.h>
|
||
|
|
#include <inttypes.h>
|
||
|
|
#include <signal.h>
|
||
|
|
#include <errno.h>
|
||
|
|
#include <stdio.h>
|
||
|
|
#include <string.h>
|
||
|
|
#include <sys/time.h>
|
||
|
|
#include <assert.h>
|
||
|
|
#include <stdarg.h>
|
||
|
|
#include <unistd.h>
|
||
|
|
#include <limits.h>
|
||
|
|
|
||
|
|
#include "thread_pool_internal.h"
|
||
|
|
#include "htslib/hts_log.h"
|
||
|
|
|
||
|
|
// Minimum stack size for threads. Required for some rANS codecs
|
||
|
|
// that use over 2Mbytes of stack for encoder / decoder state
|
||
|
|
#define HTS_MIN_THREAD_STACK (3 * 1024 * 1024)
|
||
|
|
|
||
|
|
static void hts_tpool_process_detach_locked(hts_tpool *p,
|
||
|
|
hts_tpool_process *q);
|
||
|
|
|
||
|
|
//#define DEBUG
|
||
|
|
|
||
|
|
#ifdef DEBUG
|
||
|
|
static int worker_id(hts_tpool *p) {
|
||
|
|
int i;
|
||
|
|
pthread_t s = pthread_self();
|
||
|
|
for (i = 0; i < p->tsize; i++) {
|
||
|
|
if (pthread_equal(s, p->t[i].tid))
|
||
|
|
return i;
|
||
|
|
}
|
||
|
|
return -1;
|
||
|
|
}
|
||
|
|
|
||
|
|
void DBG_OUT(FILE *fp, char *fmt, ...) {
|
||
|
|
va_list args;
|
||
|
|
va_start(args, fmt);
|
||
|
|
vfprintf(fp, fmt, args);
|
||
|
|
va_end(args);
|
||
|
|
}
|
||
|
|
#else
|
||
|
|
#define DBG_OUT(...) do{}while(0)
|
||
|
|
#endif
|
||
|
|
|
||
|
|
/* ----------------------------------------------------------------------------
|
||
|
|
* A process-queue to hold results from the thread pool.
|
||
|
|
*
|
||
|
|
* Each thread pool may have jobs of multiple types being queued up and
|
||
|
|
* interleaved, so we attach several job process-queues to a single pool.
|
||
|
|
*
|
||
|
|
* The jobs themselves are expected to push their results onto their
|
||
|
|
* appropriate results queue.
|
||
|
|
*/
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Adds a result to the end of the process result queue.
|
||
|
|
*
|
||
|
|
* Returns 0 on success;
|
||
|
|
* -1 on failure
|
||
|
|
*/
|
||
|
|
static int hts_tpool_add_result(hts_tpool_job *j, void *data) {
|
||
|
|
hts_tpool_process *q = j->q;
|
||
|
|
hts_tpool_result *r;
|
||
|
|
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "%d: Adding result to queue %p, serial %"PRId64", %d of %d\n",
|
||
|
|
worker_id(j->p), q, j->serial, q->n_output+1, q->qsize);
|
||
|
|
|
||
|
|
if (--q->n_processing == 0)
|
||
|
|
pthread_cond_signal(&q->none_processing_c);
|
||
|
|
|
||
|
|
/* No results queue is fine if we don't want any results back */
|
||
|
|
if (q->in_only) {
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
return 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
if (!(r = malloc(sizeof(*r)))) {
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
hts_tpool_process_shutdown(q);
|
||
|
|
return -1;
|
||
|
|
}
|
||
|
|
|
||
|
|
r->next = NULL;
|
||
|
|
r->data = data;
|
||
|
|
r->result_cleanup = j->result_cleanup;
|
||
|
|
r->serial = j->serial;
|
||
|
|
|
||
|
|
q->n_output++;
|
||
|
|
if (q->output_tail) {
|
||
|
|
q->output_tail->next = r;
|
||
|
|
q->output_tail = r;
|
||
|
|
} else {
|
||
|
|
q->output_head = q->output_tail = r;
|
||
|
|
}
|
||
|
|
|
||
|
|
assert(r->serial >= q->next_serial // Or it will never be dequeued ...
|
||
|
|
|| q->next_serial == INT_MAX); // ... unless flush in progress.
|
||
|
|
if (r->serial == q->next_serial) {
|
||
|
|
DBG_OUT(stderr, "%d: Broadcasting result_avail (id %"PRId64")\n",
|
||
|
|
worker_id(j->p), r->serial);
|
||
|
|
pthread_cond_broadcast(&q->output_avail_c);
|
||
|
|
DBG_OUT(stderr, "%d: Broadcast complete\n", worker_id(j->p));
|
||
|
|
}
|
||
|
|
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
|
||
|
|
return 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
static void wake_next_worker(hts_tpool_process *q, int locked);
|
||
|
|
|
||
|
|
/* Core of hts_tpool_next_result() */
|
||
|
|
static hts_tpool_result *hts_tpool_next_result_locked(hts_tpool_process *q) {
|
||
|
|
hts_tpool_result *r, *last;
|
||
|
|
|
||
|
|
if (q->shutdown)
|
||
|
|
return NULL;
|
||
|
|
|
||
|
|
for (last = NULL, r = q->output_head; r; last = r, r = r->next) {
|
||
|
|
if (r->serial == q->next_serial)
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
|
||
|
|
if (r) {
|
||
|
|
// Remove r from out linked list
|
||
|
|
if (q->output_head == r)
|
||
|
|
q->output_head = r->next;
|
||
|
|
else
|
||
|
|
last->next = r->next;
|
||
|
|
|
||
|
|
if (q->output_tail == r)
|
||
|
|
q->output_tail = last;
|
||
|
|
|
||
|
|
if (!q->output_head)
|
||
|
|
q->output_tail = NULL;
|
||
|
|
|
||
|
|
q->next_serial++;
|
||
|
|
q->n_output--;
|
||
|
|
|
||
|
|
if (q->qsize && q->n_output < q->qsize) {
|
||
|
|
// Not technically input full, but can guarantee there is
|
||
|
|
// room for the input to go somewhere so we still signal.
|
||
|
|
// The waiting code will then check the condition again.
|
||
|
|
if (q->n_input < q->qsize)
|
||
|
|
pthread_cond_signal(&q->input_not_full_c);
|
||
|
|
if (!q->shutdown)
|
||
|
|
wake_next_worker(q, 1);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
return r;
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Pulls the next item off the process result queue. The caller should free
|
||
|
|
* it (and any internals as appropriate) after use. This doesn't wait for a
|
||
|
|
* result to be present.
|
||
|
|
*
|
||
|
|
* Results will be returned in strict order.
|
||
|
|
*
|
||
|
|
* Returns hts_tpool_result pointer if a result is ready.
|
||
|
|
* NULL if not.
|
||
|
|
*/
|
||
|
|
hts_tpool_result *hts_tpool_next_result(hts_tpool_process *q) {
|
||
|
|
hts_tpool_result *r;
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "Requesting next result on queue %p\n", q);
|
||
|
|
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
r = hts_tpool_next_result_locked(q);
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "(q=%p) Found %p\n", q, r);
|
||
|
|
|
||
|
|
return r;
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Pulls the next item off the process result queue. The caller should free
|
||
|
|
* it (and any internals as appropriate) after use. This will wait for
|
||
|
|
* a result to be present if none are currently available.
|
||
|
|
*
|
||
|
|
* Results will be returned in strict order.
|
||
|
|
*
|
||
|
|
* Returns hts_tpool_result pointer if a result is ready.
|
||
|
|
* NULL on error or during shutdown.
|
||
|
|
*/
|
||
|
|
hts_tpool_result *hts_tpool_next_result_wait(hts_tpool_process *q) {
|
||
|
|
hts_tpool_result *r;
|
||
|
|
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
while (!(r = hts_tpool_next_result_locked(q))) {
|
||
|
|
/* Possible race here now avoided via _locked() call, but in case... */
|
||
|
|
struct timeval now;
|
||
|
|
struct timespec timeout;
|
||
|
|
|
||
|
|
gettimeofday(&now, NULL);
|
||
|
|
timeout.tv_sec = now.tv_sec + 10;
|
||
|
|
timeout.tv_nsec = now.tv_usec * 1000;
|
||
|
|
|
||
|
|
q->ref_count++;
|
||
|
|
if (q->shutdown) {
|
||
|
|
int rc = --q->ref_count;
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
if (rc == 0)
|
||
|
|
hts_tpool_process_destroy(q);
|
||
|
|
return NULL;
|
||
|
|
}
|
||
|
|
pthread_cond_timedwait(&q->output_avail_c, &q->p->pool_m, &timeout);
|
||
|
|
|
||
|
|
q->ref_count--;
|
||
|
|
}
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
|
||
|
|
return r;
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Returns true if there are no items in the process results queue and
|
||
|
|
* also none still pending.
|
||
|
|
*/
|
||
|
|
int hts_tpool_process_empty(hts_tpool_process *q) {
|
||
|
|
int empty;
|
||
|
|
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
empty = q->n_input == 0 && q->n_processing == 0 && q->n_output == 0;
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
|
||
|
|
return empty;
|
||
|
|
}
|
||
|
|
|
||
|
|
void hts_tpool_process_ref_incr(hts_tpool_process *q) {
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
q->ref_count++;
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
}
|
||
|
|
|
||
|
|
void hts_tpool_process_ref_decr(hts_tpool_process *q) {
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
if (--q->ref_count <= 0) {
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
hts_tpool_process_destroy(q);
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
// maybe also call destroy here if needed?
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Returns the number of completed jobs in the process results queue.
|
||
|
|
*/
|
||
|
|
int hts_tpool_process_len(hts_tpool_process *q) {
|
||
|
|
int len;
|
||
|
|
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
len = q->n_output;
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
|
||
|
|
return len;
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Returns the number of completed jobs in the process results queue plus the
|
||
|
|
* number running and queued up to run.
|
||
|
|
*/
|
||
|
|
int hts_tpool_process_sz(hts_tpool_process *q) {
|
||
|
|
int len;
|
||
|
|
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
len = q->n_output + q->n_input + q->n_processing;
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
|
||
|
|
return len;
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Shutdown a process.
|
||
|
|
*
|
||
|
|
* This sets the shutdown flag and wakes any threads waiting on process
|
||
|
|
* condition variables.
|
||
|
|
*/
|
||
|
|
static void hts_tpool_process_shutdown_locked(hts_tpool_process *q) {
|
||
|
|
q->shutdown = 1;
|
||
|
|
pthread_cond_broadcast(&q->output_avail_c);
|
||
|
|
pthread_cond_broadcast(&q->input_not_full_c);
|
||
|
|
pthread_cond_broadcast(&q->input_empty_c);
|
||
|
|
pthread_cond_broadcast(&q->none_processing_c);
|
||
|
|
}
|
||
|
|
|
||
|
|
void hts_tpool_process_shutdown(hts_tpool_process *q) {
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
hts_tpool_process_shutdown_locked(q);
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
}
|
||
|
|
|
||
|
|
int hts_tpool_process_is_shutdown(hts_tpool_process *q) {
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
int r = q->shutdown;
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
return r;
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Frees a result 'r' and if free_data is true also frees
|
||
|
|
* the internal r->data result too.
|
||
|
|
*/
|
||
|
|
void hts_tpool_delete_result(hts_tpool_result *r, int free_data) {
|
||
|
|
if (!r)
|
||
|
|
return;
|
||
|
|
|
||
|
|
if (free_data && r->data)
|
||
|
|
free(r->data);
|
||
|
|
|
||
|
|
free(r);
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Returns the data portion of a hts_tpool_result, corresponding
|
||
|
|
* to the actual "result" itself.
|
||
|
|
*/
|
||
|
|
void *hts_tpool_result_data(hts_tpool_result *r) {
|
||
|
|
return r->data;
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Initialises a thread process-queue.
|
||
|
|
*
|
||
|
|
* In_only, if true, indicates that the process generates does not need to
|
||
|
|
* hold any output. Otherwise an output queue is used to store the results
|
||
|
|
* of processing each input job.
|
||
|
|
*
|
||
|
|
* Results hts_tpool_process pointer on success;
|
||
|
|
* NULL on failure
|
||
|
|
*/
|
||
|
|
hts_tpool_process *hts_tpool_process_init(hts_tpool *p, int qsize, int in_only) {
|
||
|
|
hts_tpool_process *q = malloc(sizeof(*q));
|
||
|
|
if (!q)
|
||
|
|
return NULL;
|
||
|
|
|
||
|
|
pthread_cond_init(&q->output_avail_c, NULL);
|
||
|
|
pthread_cond_init(&q->input_not_full_c, NULL);
|
||
|
|
pthread_cond_init(&q->input_empty_c, NULL);
|
||
|
|
pthread_cond_init(&q->none_processing_c,NULL);
|
||
|
|
|
||
|
|
q->p = p;
|
||
|
|
q->input_head = NULL;
|
||
|
|
q->input_tail = NULL;
|
||
|
|
q->output_head = NULL;
|
||
|
|
q->output_tail = NULL;
|
||
|
|
q->next_serial = 0;
|
||
|
|
q->curr_serial = 0;
|
||
|
|
q->no_more_input = 0;
|
||
|
|
q->n_input = 0;
|
||
|
|
q->n_output = 0;
|
||
|
|
q->n_processing= 0;
|
||
|
|
q->qsize = qsize;
|
||
|
|
q->in_only = in_only;
|
||
|
|
q->shutdown = 0;
|
||
|
|
q->wake_dispatch = 0;
|
||
|
|
q->ref_count = 1;
|
||
|
|
|
||
|
|
q->next = NULL;
|
||
|
|
q->prev = NULL;
|
||
|
|
|
||
|
|
hts_tpool_process_attach(p, q);
|
||
|
|
|
||
|
|
return q;
|
||
|
|
}
|
||
|
|
|
||
|
|
/* Deallocates memory for a thread process-queue.
|
||
|
|
* Must be called before the thread pool is destroyed.
|
||
|
|
*/
|
||
|
|
void hts_tpool_process_destroy(hts_tpool_process *q) {
|
||
|
|
DBG_OUT(stderr, "Destroying results queue %p\n", q);
|
||
|
|
|
||
|
|
if (!q)
|
||
|
|
return;
|
||
|
|
|
||
|
|
// Prevent dispatch from queuing up any more jobs.
|
||
|
|
// We want to reset (and flush) the queue here, before
|
||
|
|
// we set the shutdown flag, but we need to avoid races
|
||
|
|
// with queue more input during reset.
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
q->no_more_input = 1;
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
|
||
|
|
// Ensure it's fully drained before destroying the queue
|
||
|
|
hts_tpool_process_reset(q, 0);
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
hts_tpool_process_detach_locked(q->p, q);
|
||
|
|
hts_tpool_process_shutdown_locked(q);
|
||
|
|
|
||
|
|
// Maybe a worker is scanning this queue, so delay destruction
|
||
|
|
if (--q->ref_count > 0) {
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
return;
|
||
|
|
}
|
||
|
|
|
||
|
|
pthread_cond_destroy(&q->output_avail_c);
|
||
|
|
pthread_cond_destroy(&q->input_not_full_c);
|
||
|
|
pthread_cond_destroy(&q->input_empty_c);
|
||
|
|
pthread_cond_destroy(&q->none_processing_c);
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
|
||
|
|
free(q);
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "Destroyed results queue %p\n", q);
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Attach and detach a thread process-queue with / from the thread pool
|
||
|
|
* scheduler.
|
||
|
|
*
|
||
|
|
* We need to do attach after making a thread process, but may also wish
|
||
|
|
* to temporarily detach if we wish to stop running jobs on a specific
|
||
|
|
* process while permitting other process to continue.
|
||
|
|
*/
|
||
|
|
void hts_tpool_process_attach(hts_tpool *p, hts_tpool_process *q) {
|
||
|
|
pthread_mutex_lock(&p->pool_m);
|
||
|
|
if (p->q_head) {
|
||
|
|
q->next = p->q_head;
|
||
|
|
q->prev = p->q_head->prev;
|
||
|
|
p->q_head->prev->next = q;
|
||
|
|
p->q_head->prev = q;
|
||
|
|
} else {
|
||
|
|
q->next = q;
|
||
|
|
q->prev = q;
|
||
|
|
}
|
||
|
|
p->q_head = q;
|
||
|
|
assert(p->q_head && p->q_head->prev && p->q_head->next);
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
}
|
||
|
|
|
||
|
|
static void hts_tpool_process_detach_locked(hts_tpool *p,
|
||
|
|
hts_tpool_process *q) {
|
||
|
|
if (!p->q_head || !q->prev || !q->next)
|
||
|
|
return;
|
||
|
|
|
||
|
|
hts_tpool_process *curr = p->q_head, *first = curr;
|
||
|
|
do {
|
||
|
|
if (curr == q) {
|
||
|
|
q->next->prev = q->prev;
|
||
|
|
q->prev->next = q->next;
|
||
|
|
p->q_head = q->next;
|
||
|
|
q->next = q->prev = NULL;
|
||
|
|
|
||
|
|
// Last one
|
||
|
|
if (p->q_head == q)
|
||
|
|
p->q_head = NULL;
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
|
||
|
|
curr = curr->next;
|
||
|
|
} while (curr != first);
|
||
|
|
}
|
||
|
|
|
||
|
|
void hts_tpool_process_detach(hts_tpool *p, hts_tpool_process *q) {
|
||
|
|
pthread_mutex_lock(&p->pool_m);
|
||
|
|
hts_tpool_process_detach_locked(p, q);
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
/* ----------------------------------------------------------------------------
|
||
|
|
* The thread pool.
|
||
|
|
*/
|
||
|
|
|
||
|
|
#define TDIFF(t2,t1) ((t2.tv_sec-t1.tv_sec)*1000000 + t2.tv_usec-t1.tv_usec)
|
||
|
|
|
||
|
|
/*
|
||
|
|
* A worker thread.
|
||
|
|
*
|
||
|
|
* Once woken, each thread checks each process-queue in the pool in turn,
|
||
|
|
* looking for input jobs that also have room for the output (if it requires
|
||
|
|
* storing). If found, we execute it and repeat.
|
||
|
|
*
|
||
|
|
* If we checked all input queues and find no such job, then we wait until we
|
||
|
|
* are signalled to check again.
|
||
|
|
*/
|
||
|
|
static void *tpool_worker(void *arg) {
|
||
|
|
hts_tpool_worker *w = (hts_tpool_worker *)arg;
|
||
|
|
hts_tpool *p = w->p;
|
||
|
|
hts_tpool_job *j;
|
||
|
|
|
||
|
|
pthread_mutex_lock(&p->pool_m);
|
||
|
|
while (!p->shutdown) {
|
||
|
|
// Pop an item off the pool queue
|
||
|
|
|
||
|
|
assert(p->q_head == 0 || (p->q_head->prev && p->q_head->next));
|
||
|
|
|
||
|
|
int work_to_do = 0;
|
||
|
|
hts_tpool_process *first = p->q_head, *q = first;
|
||
|
|
do {
|
||
|
|
// Iterate over queues, finding one with jobs and also
|
||
|
|
// room to put the result.
|
||
|
|
//if (q && q->input_head && !hts_tpool_process_output_full(q)) {
|
||
|
|
if (q && q->input_head
|
||
|
|
&& q->qsize - q->n_output > q->n_processing
|
||
|
|
&& !q->shutdown) {
|
||
|
|
work_to_do = 1;
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
|
||
|
|
if (q) q = q->next;
|
||
|
|
} while (q && q != first);
|
||
|
|
|
||
|
|
if (!work_to_do) {
|
||
|
|
// We scanned all queues and cannot process any, so we wait.
|
||
|
|
p->nwaiting++;
|
||
|
|
|
||
|
|
// Push this thread to the top of the waiting stack
|
||
|
|
if (p->t_stack_top == -1 || p->t_stack_top > w->idx)
|
||
|
|
p->t_stack_top = w->idx;
|
||
|
|
|
||
|
|
p->t_stack[w->idx] = 1;
|
||
|
|
// printf("%2d: no work. In=%d Proc=%d Out=%d full=%d\n",
|
||
|
|
// w->idx, p->q_head->n_input, p->q_head->n_processing, p->q_head->n_output,
|
||
|
|
// hts_tpool_process_output_full(p->q_head));
|
||
|
|
pthread_cond_wait(&w->pending_c, &p->pool_m);
|
||
|
|
p->t_stack[w->idx] = 0;
|
||
|
|
|
||
|
|
/* Find new t_stack_top */
|
||
|
|
int i;
|
||
|
|
p->t_stack_top = -1;
|
||
|
|
for (i = 0; i < p->tsize; i++) {
|
||
|
|
if (p->t_stack[i]) {
|
||
|
|
p->t_stack_top = i;
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
p->nwaiting--;
|
||
|
|
continue; // To outer loop.
|
||
|
|
}
|
||
|
|
|
||
|
|
// Otherwise work_to_do, so process as many items in this queue as
|
||
|
|
// possible before switching to another queue. This means threads
|
||
|
|
// often end up being dedicated to one type of work.
|
||
|
|
q->ref_count++;
|
||
|
|
while (q->input_head && q->qsize - q->n_output > q->n_processing) {
|
||
|
|
if (p->shutdown)
|
||
|
|
goto shutdown;
|
||
|
|
|
||
|
|
if (q->shutdown)
|
||
|
|
// Queue shutdown, but there may be other queues
|
||
|
|
break;
|
||
|
|
|
||
|
|
j = q->input_head;
|
||
|
|
assert(j->p == p);
|
||
|
|
|
||
|
|
if (!(q->input_head = j->next))
|
||
|
|
q->input_tail = NULL;
|
||
|
|
|
||
|
|
// Transitioning from full queue to not-full means we can wake up
|
||
|
|
// any blocked dispatch threads. We broadcast this as it's only
|
||
|
|
// happening once (on the transition) rather than every time we
|
||
|
|
// are below qsize.
|
||
|
|
// (I wish I could remember why io_lib rev 3660 changed this from
|
||
|
|
// == to >=, but keeping it just in case!)
|
||
|
|
q->n_processing++;
|
||
|
|
if (q->n_input-- >= q->qsize)
|
||
|
|
pthread_cond_broadcast(&q->input_not_full_c);
|
||
|
|
|
||
|
|
if (q->n_input == 0)
|
||
|
|
pthread_cond_signal(&q->input_empty_c);
|
||
|
|
|
||
|
|
p->njobs--; // Total number of jobs; used to adjust to CPU scaling
|
||
|
|
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "%d: Processing queue %p, serial %"PRId64"\n",
|
||
|
|
worker_id(j->p), q, j->serial);
|
||
|
|
|
||
|
|
if (hts_tpool_add_result(j, j->func(j->arg)) < 0)
|
||
|
|
goto err;
|
||
|
|
//memset(j, 0xbb, sizeof(*j));
|
||
|
|
free(j);
|
||
|
|
|
||
|
|
pthread_mutex_lock(&p->pool_m);
|
||
|
|
}
|
||
|
|
if (--q->ref_count == 0) { // we were the last user
|
||
|
|
hts_tpool_process_destroy(q);
|
||
|
|
} else {
|
||
|
|
// Out of jobs on this queue, so restart search from next one.
|
||
|
|
// This is equivalent to "work-stealing".
|
||
|
|
if (p->q_head)
|
||
|
|
p->q_head = p->q_head->next;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
shutdown:
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
#ifdef DEBUG
|
||
|
|
fprintf(stderr, "%d: Shutting down\n", worker_id(p));
|
||
|
|
#endif
|
||
|
|
return NULL;
|
||
|
|
|
||
|
|
err:
|
||
|
|
#ifdef DEBUG
|
||
|
|
fprintf(stderr, "%d: Failed to add result\n", worker_id(p));
|
||
|
|
#endif
|
||
|
|
// Hard failure, so shutdown all queues
|
||
|
|
pthread_mutex_lock(&p->pool_m);
|
||
|
|
hts_tpool_process *first = p->q_head, *q = first;
|
||
|
|
if (q) {
|
||
|
|
do {
|
||
|
|
hts_tpool_process_shutdown_locked(q);
|
||
|
|
q->shutdown = 2; // signify error.
|
||
|
|
q = q->next;
|
||
|
|
} while (q != first);
|
||
|
|
}
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
return NULL;
|
||
|
|
}
|
||
|
|
|
||
|
|
static void wake_next_worker(hts_tpool_process *q, int locked) {
|
||
|
|
if (!q) return;
|
||
|
|
hts_tpool *p = q->p;
|
||
|
|
if (!locked)
|
||
|
|
pthread_mutex_lock(&p->pool_m);
|
||
|
|
|
||
|
|
// Update the q_head to be this queue so we'll start processing
|
||
|
|
// the queue we know to have results.
|
||
|
|
assert(q->prev && q->next); // attached
|
||
|
|
p->q_head = q;
|
||
|
|
|
||
|
|
// Wake up if we have more jobs waiting than CPUs. This partially combats
|
||
|
|
// CPU frequency scaling effects. Starting too many threads and then
|
||
|
|
// running out of jobs can cause each thread to have lots of start/stop
|
||
|
|
// cycles, which then translates often to CPU frequency scaling
|
||
|
|
// adjustments. Instead it is better to only start as many threads as we
|
||
|
|
// need to keep the throughput up, meaning some threads run flat out and
|
||
|
|
// others are idle.
|
||
|
|
//
|
||
|
|
// This isn't perfect as we need to know how many can actually start,
|
||
|
|
// rather than how many are waiting. A limit on output queue size makes
|
||
|
|
// these two figures different.
|
||
|
|
assert(p->njobs >= q->n_input);
|
||
|
|
|
||
|
|
int running = p->tsize - p->nwaiting;
|
||
|
|
int sig = p->t_stack_top >= 0 && p->njobs > p->tsize - p->nwaiting
|
||
|
|
&& (q->n_processing < q->qsize - q->n_output);
|
||
|
|
|
||
|
|
//#define AVG_USAGE
|
||
|
|
#ifdef AVG_USAGE
|
||
|
|
// Track average number of running threads and try to keep close.
|
||
|
|
// We permit this to change, but slowly. This avoids "boom and bust" cycles
|
||
|
|
// where we read a lot of data, start a lot of jobs, then become idle again.
|
||
|
|
// This way some threads run steadily and others dormant, which is better
|
||
|
|
// for throughput.
|
||
|
|
//
|
||
|
|
// It's 50:50 if this is a good thing. It helps some tasks quite significantly
|
||
|
|
// while slightly hindering other (perhaps more usual) jobs.
|
||
|
|
|
||
|
|
if (++p->n_count == 256) {
|
||
|
|
p->n_count >>= 1;
|
||
|
|
p->n_running >>= 1;
|
||
|
|
}
|
||
|
|
p->n_running += running;
|
||
|
|
// Built in lag to avoid see-sawing. Is this safe in all cases?
|
||
|
|
if (sig && p->n_count >= 128 && running*p->n_count > p->n_running+1) sig=0;
|
||
|
|
#endif
|
||
|
|
|
||
|
|
if (0) {
|
||
|
|
printf("%d waiting, %d running, %d output, %d, arun %d => %d\t", p->njobs,
|
||
|
|
running, q->n_output, q->qsize - q->n_output,
|
||
|
|
p->n_running/p->n_count, sig);
|
||
|
|
int i;
|
||
|
|
for (i = 0; i < p->tsize; i++)
|
||
|
|
putchar("x "[p->t_stack[i]]);
|
||
|
|
putchar('\n');
|
||
|
|
}
|
||
|
|
|
||
|
|
if (sig)
|
||
|
|
pthread_cond_signal(&p->t[p->t_stack_top].pending_c);
|
||
|
|
|
||
|
|
if (!locked)
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Creates a worker pool with n worker threads.
|
||
|
|
*
|
||
|
|
* Returns pool pointer on success;
|
||
|
|
* NULL on failure
|
||
|
|
*/
|
||
|
|
hts_tpool *hts_tpool_init(int n) {
|
||
|
|
int t_idx = 0;
|
||
|
|
size_t stack_size = 0;
|
||
|
|
pthread_attr_t pattr;
|
||
|
|
int pattr_init_done = 0;
|
||
|
|
hts_tpool *p = malloc(sizeof(*p));
|
||
|
|
if (!p)
|
||
|
|
return NULL;
|
||
|
|
p->tsize = n;
|
||
|
|
p->njobs = 0;
|
||
|
|
p->nwaiting = 0;
|
||
|
|
p->shutdown = 0;
|
||
|
|
p->q_head = NULL;
|
||
|
|
p->t_stack = NULL;
|
||
|
|
p->n_count = 0;
|
||
|
|
p->n_running = 0;
|
||
|
|
p->t = malloc(n * sizeof(p->t[0]));
|
||
|
|
if (!p->t) {
|
||
|
|
free(p);
|
||
|
|
return NULL;
|
||
|
|
}
|
||
|
|
p->t_stack = malloc(n * sizeof(*p->t_stack));
|
||
|
|
if (!p->t_stack) {
|
||
|
|
free(p->t);
|
||
|
|
free(p);
|
||
|
|
return NULL;
|
||
|
|
}
|
||
|
|
p->t_stack_top = -1;
|
||
|
|
|
||
|
|
pthread_mutexattr_t attr;
|
||
|
|
pthread_mutexattr_init(&attr);
|
||
|
|
pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
|
||
|
|
pthread_mutex_init(&p->pool_m, &attr);
|
||
|
|
pthread_mutexattr_destroy(&attr);
|
||
|
|
|
||
|
|
pthread_mutex_lock(&p->pool_m);
|
||
|
|
|
||
|
|
// Ensure new threads have a reasonably large stack. On some platforms,
|
||
|
|
// for example MacOS which defaults to 512Kb, this is not big enough
|
||
|
|
// for some of the rANS codecs.
|
||
|
|
|
||
|
|
if (pthread_attr_init(&pattr) < 0)
|
||
|
|
goto cleanup;
|
||
|
|
pattr_init_done = 1;
|
||
|
|
if (pthread_attr_getstacksize(&pattr, &stack_size) < 0)
|
||
|
|
goto cleanup;
|
||
|
|
if (stack_size < HTS_MIN_THREAD_STACK) {
|
||
|
|
if (pthread_attr_setstacksize(&pattr, HTS_MIN_THREAD_STACK) < 0)
|
||
|
|
goto cleanup;
|
||
|
|
}
|
||
|
|
|
||
|
|
for (t_idx = 0; t_idx < n; t_idx++) {
|
||
|
|
hts_tpool_worker *w = &p->t[t_idx];
|
||
|
|
p->t_stack[t_idx] = 0;
|
||
|
|
w->p = p;
|
||
|
|
w->idx = t_idx;
|
||
|
|
pthread_cond_init(&w->pending_c, NULL);
|
||
|
|
if (0 != pthread_create(&w->tid, &pattr, tpool_worker, w))
|
||
|
|
goto cleanup;
|
||
|
|
}
|
||
|
|
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
pthread_attr_destroy(&pattr);
|
||
|
|
|
||
|
|
return p;
|
||
|
|
|
||
|
|
cleanup: {
|
||
|
|
// Any threads started will be waiting for p->pool_m, so we can
|
||
|
|
// stop them cleanly by setting p->shutdown, releasing the mutex and
|
||
|
|
// waiting for them to finish.
|
||
|
|
int j;
|
||
|
|
int save_errno = errno;
|
||
|
|
hts_log_error("Couldn't start thread pool worker : %s",
|
||
|
|
strerror(errno));
|
||
|
|
p->shutdown = 1;
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
for (j = 0; j < t_idx; j++) {
|
||
|
|
pthread_join(p->t[j].tid, NULL);
|
||
|
|
pthread_cond_destroy(&p->t[j].pending_c);
|
||
|
|
}
|
||
|
|
pthread_mutex_destroy(&p->pool_m);
|
||
|
|
if (pattr_init_done)
|
||
|
|
pthread_attr_destroy(&pattr);
|
||
|
|
free(p->t_stack);
|
||
|
|
free(p->t);
|
||
|
|
free(p);
|
||
|
|
errno = save_errno;
|
||
|
|
return NULL;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Returns the number of requested threads for a pool.
|
||
|
|
*/
|
||
|
|
int hts_tpool_size(hts_tpool *p) {
|
||
|
|
return p->tsize;
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Adds an item to the work pool.
|
||
|
|
*
|
||
|
|
* Returns 0 on success
|
||
|
|
* -1 on failure
|
||
|
|
*/
|
||
|
|
int hts_tpool_dispatch(hts_tpool *p, hts_tpool_process *q,
|
||
|
|
void *(*func)(void *arg), void *arg) {
|
||
|
|
return hts_tpool_dispatch3(p, q, func, arg, NULL, NULL, 0);
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* As above but optional non-block flag.
|
||
|
|
*
|
||
|
|
* nonblock 0 => block if input queue is full
|
||
|
|
* nonblock +1 => don't block if input queue is full, but do not add task
|
||
|
|
* nonblock -1 => add task regardless of whether queue is full (over-size)
|
||
|
|
*/
|
||
|
|
int hts_tpool_dispatch2(hts_tpool *p, hts_tpool_process *q,
|
||
|
|
void *(*func)(void *arg), void *arg, int nonblock) {
|
||
|
|
return hts_tpool_dispatch3(p, q, func, arg, NULL, NULL, nonblock);
|
||
|
|
}
|
||
|
|
|
||
|
|
int hts_tpool_dispatch3(hts_tpool *p, hts_tpool_process *q,
|
||
|
|
void *(*exec_func)(void *arg), void *arg,
|
||
|
|
void (*job_cleanup)(void *arg),
|
||
|
|
void (*result_cleanup)(void *data),
|
||
|
|
int nonblock) {
|
||
|
|
hts_tpool_job *j;
|
||
|
|
|
||
|
|
pthread_mutex_lock(&p->pool_m);
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "Dispatching job for queue %p, serial %"PRId64"\n",
|
||
|
|
q, q->curr_serial);
|
||
|
|
|
||
|
|
if ((q->no_more_input || q->n_input >= q->qsize) && nonblock == 1) {
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
errno = EAGAIN;
|
||
|
|
return -1;
|
||
|
|
}
|
||
|
|
|
||
|
|
if (!(j = malloc(sizeof(*j)))) {
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
return -1;
|
||
|
|
}
|
||
|
|
j->func = exec_func;
|
||
|
|
j->arg = arg;
|
||
|
|
j->job_cleanup = job_cleanup;
|
||
|
|
j->result_cleanup = result_cleanup;
|
||
|
|
j->next = NULL;
|
||
|
|
j->p = p;
|
||
|
|
j->q = q;
|
||
|
|
j->serial = q->curr_serial++;
|
||
|
|
|
||
|
|
if (nonblock == 0) {
|
||
|
|
while ((q->no_more_input || q->n_input >= q->qsize) &&
|
||
|
|
!q->shutdown && !q->wake_dispatch) {
|
||
|
|
pthread_cond_wait(&q->input_not_full_c, &q->p->pool_m);
|
||
|
|
}
|
||
|
|
if (q->no_more_input || q->shutdown) {
|
||
|
|
free(j);
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
return -1;
|
||
|
|
}
|
||
|
|
if (q->wake_dispatch) {
|
||
|
|
//fprintf(stderr, "Wake => non-block for this operation\n");
|
||
|
|
q->wake_dispatch = 0;
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
p->njobs++; // total across all queues
|
||
|
|
q->n_input++; // queue specific
|
||
|
|
|
||
|
|
if (q->input_tail) {
|
||
|
|
q->input_tail->next = j;
|
||
|
|
q->input_tail = j;
|
||
|
|
} else {
|
||
|
|
q->input_head = q->input_tail = j;
|
||
|
|
}
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "Dispatched (serial %"PRId64")\n", j->serial);
|
||
|
|
|
||
|
|
// Let a worker know we have data.
|
||
|
|
// Keep incoming queue at 1 per running thread, so there is always
|
||
|
|
// something waiting when they end their current task. If we go above
|
||
|
|
// this signal to start more threads (if available). This has the effect
|
||
|
|
// of concentrating jobs to fewer cores when we are I/O bound, which in
|
||
|
|
// turn benefits systems with auto CPU frequency scaling.
|
||
|
|
if (!q->shutdown)
|
||
|
|
wake_next_worker(q, 1);
|
||
|
|
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
|
||
|
|
return 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Wakes up a single thread stuck in dispatch and make it return with
|
||
|
|
* errno EAGAIN.
|
||
|
|
*/
|
||
|
|
void hts_tpool_wake_dispatch(hts_tpool_process *q) {
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
q->wake_dispatch = 1;
|
||
|
|
pthread_cond_signal(&q->input_not_full_c);
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Flushes the process-queue, but doesn't exit. This simply drains the queue
|
||
|
|
* and ensures all worker threads have finished their current tasks
|
||
|
|
* associated with this process.
|
||
|
|
*
|
||
|
|
* NOT: This does not mean the worker threads are not executing jobs in
|
||
|
|
* another process-queue.
|
||
|
|
*
|
||
|
|
* Returns 0 on success;
|
||
|
|
* -1 on failure
|
||
|
|
*/
|
||
|
|
int hts_tpool_process_flush(hts_tpool_process *q) {
|
||
|
|
int i;
|
||
|
|
hts_tpool *p = q->p;
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "Flushing pool %p\n", p);
|
||
|
|
|
||
|
|
// Drains the queue
|
||
|
|
pthread_mutex_lock(&p->pool_m);
|
||
|
|
|
||
|
|
// Wake up everything for the final sprint!
|
||
|
|
for (i = 0; i < p->tsize; i++)
|
||
|
|
if (p->t_stack[i])
|
||
|
|
pthread_cond_signal(&p->t[i].pending_c);
|
||
|
|
|
||
|
|
// Ensure there is room for the final sprint.
|
||
|
|
// Ideally we shouldn't get here, but the "q->qsize - q->n_output >
|
||
|
|
// n_processing" check in tpool_worker means we can trigger a
|
||
|
|
// deadlock there. This negates that possibility.
|
||
|
|
if (q->qsize < q->n_output + q->n_input + q->n_processing)
|
||
|
|
q->qsize = q->n_output + q->n_input + q->n_processing;
|
||
|
|
|
||
|
|
// When shutdown, we won't be launching more, but we can still
|
||
|
|
// wait for any processing jobs complete.
|
||
|
|
if (q->shutdown) {
|
||
|
|
while (q->n_processing)
|
||
|
|
pthread_cond_wait(&q->none_processing_c, &p->pool_m);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Wait for n_input and n_processing to hit zero.
|
||
|
|
while (!q->shutdown && (q->n_input || q->n_processing)) {
|
||
|
|
struct timeval now;
|
||
|
|
struct timespec timeout;
|
||
|
|
|
||
|
|
while (q->n_input && !q->shutdown) {
|
||
|
|
gettimeofday(&now, NULL);
|
||
|
|
timeout.tv_sec = now.tv_sec + 1;
|
||
|
|
timeout.tv_nsec = now.tv_usec * 1000;
|
||
|
|
pthread_cond_timedwait(&q->input_empty_c, &p->pool_m, &timeout);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Note: even if q->shutdown is set, we still have to wait until
|
||
|
|
// q->n_processing is zero as we cannot terminate while things are
|
||
|
|
// running otherwise we free up the data being worked on.
|
||
|
|
while (q->n_processing) {
|
||
|
|
gettimeofday(&now, NULL);
|
||
|
|
timeout.tv_sec = now.tv_sec + 1;
|
||
|
|
timeout.tv_nsec = now.tv_usec * 1000;
|
||
|
|
pthread_cond_timedwait(&q->none_processing_c, &p->pool_m,
|
||
|
|
&timeout);
|
||
|
|
}
|
||
|
|
if (q->shutdown) break;
|
||
|
|
}
|
||
|
|
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "Flushed complete for pool %p, queue %p\n", p, q);
|
||
|
|
|
||
|
|
return 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Resets a process to the initial state.
|
||
|
|
*
|
||
|
|
* This removes any queued up input jobs, disables any notification of
|
||
|
|
* new results/output, flushes what is left and then discards any
|
||
|
|
* queued output. Anything consumer stuck in a wait on results to
|
||
|
|
* appear should stay stuck and will only wake up when new data is
|
||
|
|
* pushed through the queue.
|
||
|
|
*
|
||
|
|
* Returns 0 on success;
|
||
|
|
* -1 on failure
|
||
|
|
*/
|
||
|
|
int hts_tpool_process_reset(hts_tpool_process *q, int free_results) {
|
||
|
|
hts_tpool_job *j, *jn, *j_head;
|
||
|
|
hts_tpool_result *r, *rn, *r_head;
|
||
|
|
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
// prevent next_result from returning data during our flush
|
||
|
|
q->next_serial = INT_MAX;
|
||
|
|
|
||
|
|
// Remove any queued input not yet being acted upon
|
||
|
|
j_head = q->input_head;
|
||
|
|
q->input_head = q->input_tail = NULL;
|
||
|
|
q->n_input = 0;
|
||
|
|
|
||
|
|
// Remove any queued output, thus ensuring we have room to flush.
|
||
|
|
r_head = q->output_head;
|
||
|
|
q->output_head = q->output_tail = NULL;
|
||
|
|
q->n_output = 0;
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
|
||
|
|
// Release memory. This can be done unlocked now the lists have been
|
||
|
|
// removed from the queue
|
||
|
|
for (j = j_head; j; j = jn) {
|
||
|
|
jn = j->next;
|
||
|
|
if (j->job_cleanup) j->job_cleanup(j->arg);
|
||
|
|
free(j);
|
||
|
|
}
|
||
|
|
|
||
|
|
for (r = r_head; r; r = rn) {
|
||
|
|
rn = r->next;
|
||
|
|
if (r->result_cleanup) {
|
||
|
|
r->result_cleanup(r->data);
|
||
|
|
r->data = NULL;
|
||
|
|
}
|
||
|
|
hts_tpool_delete_result(r, free_results);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Wait for any jobs being processed to complete.
|
||
|
|
// (TODO: consider how to cancel any currently processing jobs.
|
||
|
|
// Probably this is too hard.)
|
||
|
|
if (hts_tpool_process_flush(q) != 0)
|
||
|
|
return -1;
|
||
|
|
|
||
|
|
// Remove any new output.
|
||
|
|
pthread_mutex_lock(&q->p->pool_m);
|
||
|
|
r_head = q->output_head;
|
||
|
|
q->output_head = q->output_tail = NULL;
|
||
|
|
q->n_output = 0;
|
||
|
|
|
||
|
|
// Finally reset the serial back to the starting point.
|
||
|
|
q->next_serial = q->curr_serial = 0;
|
||
|
|
pthread_cond_signal(&q->input_not_full_c);
|
||
|
|
pthread_mutex_unlock(&q->p->pool_m);
|
||
|
|
|
||
|
|
// Discard unwanted output
|
||
|
|
for (r = r_head; r; r = rn) {
|
||
|
|
//fprintf(stderr, "Discard output %d\n", r->serial);
|
||
|
|
rn = r->next;
|
||
|
|
if (r->result_cleanup) {
|
||
|
|
r->result_cleanup(r->data);
|
||
|
|
r->data = NULL;
|
||
|
|
}
|
||
|
|
hts_tpool_delete_result(r, free_results);
|
||
|
|
}
|
||
|
|
|
||
|
|
return 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
/* Returns the process queue size */
|
||
|
|
int hts_tpool_process_qsize(hts_tpool_process *q) {
|
||
|
|
return q->qsize;
|
||
|
|
}
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Destroys a thread pool. The threads are joined into the main
|
||
|
|
* thread so they will finish their current work load.
|
||
|
|
*/
|
||
|
|
void hts_tpool_destroy(hts_tpool *p) {
|
||
|
|
int i;
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "Destroying pool %p\n", p);
|
||
|
|
|
||
|
|
/* Send shutdown message to worker threads */
|
||
|
|
pthread_mutex_lock(&p->pool_m);
|
||
|
|
p->shutdown = 1;
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "Sending shutdown request\n");
|
||
|
|
|
||
|
|
for (i = 0; i < p->tsize; i++)
|
||
|
|
pthread_cond_signal(&p->t[i].pending_c);
|
||
|
|
|
||
|
|
pthread_mutex_unlock(&p->pool_m);
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "Shutdown complete\n");
|
||
|
|
|
||
|
|
for (i = 0; i < p->tsize; i++)
|
||
|
|
pthread_join(p->t[i].tid, NULL);
|
||
|
|
|
||
|
|
pthread_mutex_destroy(&p->pool_m);
|
||
|
|
for (i = 0; i < p->tsize; i++)
|
||
|
|
pthread_cond_destroy(&p->t[i].pending_c);
|
||
|
|
|
||
|
|
if (p->t_stack)
|
||
|
|
free(p->t_stack);
|
||
|
|
|
||
|
|
free(p->t);
|
||
|
|
free(p);
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "Destroyed pool %p\n", p);
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
/*
|
||
|
|
* Destroys a thread pool without waiting on jobs to complete.
|
||
|
|
* Use hts_tpool_kill(p) to quickly exit after a fatal error.
|
||
|
|
*/
|
||
|
|
void hts_tpool_kill(hts_tpool *p) {
|
||
|
|
int i;
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "Destroying pool %p, kill=%d\n", p, kill);
|
||
|
|
|
||
|
|
for (i = 0; i < p->tsize; i++)
|
||
|
|
pthread_kill(p->t[i].tid, SIGINT);
|
||
|
|
|
||
|
|
pthread_mutex_destroy(&p->pool_m);
|
||
|
|
for (i = 0; i < p->tsize; i++)
|
||
|
|
pthread_cond_destroy(&p->t[i].pending_c);
|
||
|
|
|
||
|
|
if (p->t_stack)
|
||
|
|
free(p->t_stack);
|
||
|
|
|
||
|
|
free(p->t);
|
||
|
|
free(p);
|
||
|
|
|
||
|
|
DBG_OUT(stderr, "Destroyed pool %p\n", p);
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
/*=============================================================================
|
||
|
|
* Test app.
|
||
|
|
*
|
||
|
|
* This can be considered both as a basic test and as a worked example for
|
||
|
|
* various usage patterns.
|
||
|
|
*=============================================================================
|
||
|
|
*/
|
||
|
|
|
||
|
|
#ifdef TEST_MAIN
|
||
|
|
|
||
|
|
#include <stdio.h>
|
||
|
|
|
||
|
|
#ifndef TASK_SIZE
|
||
|
|
#define TASK_SIZE 1000
|
||
|
|
#endif
|
||
|
|
|
||
|
|
/*-----------------------------------------------------------------------------
|
||
|
|
* Unordered x -> x*x test.
|
||
|
|
* Results arrive in order of completion.
|
||
|
|
*/
|
||
|
|
void *doit_square_u(void *arg) {
|
||
|
|
int job = *(int *)arg;
|
||
|
|
|
||
|
|
usleep(random() % 100000); // to coerce job completion out of order
|
||
|
|
|
||
|
|
printf("RESULT: %d\n", job*job);
|
||
|
|
|
||
|
|
free(arg);
|
||
|
|
return NULL;
|
||
|
|
}
|
||
|
|
|
||
|
|
int test_square_u(int n) {
|
||
|
|
hts_tpool *p = hts_tpool_init(n);
|
||
|
|
hts_tpool_process *q = hts_tpool_process_init(p, n*2, 1);
|
||
|
|
int i;
|
||
|
|
|
||
|
|
// Dispatch jobs
|
||
|
|
for (i = 0; i < TASK_SIZE; i++) {
|
||
|
|
int *ip = malloc(sizeof(*ip));
|
||
|
|
*ip = i;
|
||
|
|
hts_tpool_dispatch(p, q, doit_square_u, ip);
|
||
|
|
}
|
||
|
|
|
||
|
|
hts_tpool_process_flush(q);
|
||
|
|
hts_tpool_process_destroy(q);
|
||
|
|
hts_tpool_destroy(p);
|
||
|
|
|
||
|
|
return 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
/*-----------------------------------------------------------------------------
|
||
|
|
* Ordered x -> x*x test.
|
||
|
|
* Results arrive in numerical order.
|
||
|
|
*
|
||
|
|
* This implementation uses a non-blocking dispatch to avoid dead-locks
|
||
|
|
* where one job takes too long to complete.
|
||
|
|
*/
|
||
|
|
void *doit_square(void *arg) {
|
||
|
|
int job = *(int *)arg;
|
||
|
|
int *res;
|
||
|
|
|
||
|
|
// One excessively slow, to stress test output queue filling and
|
||
|
|
// excessive out of order scenarios.
|
||
|
|
usleep(500000 * ((job&31)==31) + random() % 10000);
|
||
|
|
|
||
|
|
res = malloc(sizeof(*res));
|
||
|
|
*res = (job<0) ? -job*job : job*job;
|
||
|
|
|
||
|
|
free(arg);
|
||
|
|
return res;
|
||
|
|
}
|
||
|
|
|
||
|
|
int test_square(int n) {
|
||
|
|
hts_tpool *p = hts_tpool_init(n);
|
||
|
|
hts_tpool_process *q = hts_tpool_process_init(p, n*2, 0);
|
||
|
|
int i;
|
||
|
|
hts_tpool_result *r;
|
||
|
|
|
||
|
|
// Dispatch jobs
|
||
|
|
for (i = 0; i < TASK_SIZE; i++) {
|
||
|
|
int *ip = malloc(sizeof(*ip));
|
||
|
|
*ip = i;
|
||
|
|
int blk;
|
||
|
|
|
||
|
|
do {
|
||
|
|
// In the situation where some jobs take much longer than
|
||
|
|
// others, we could end up blocking here as we haven't got
|
||
|
|
// any room in the output queue to place it. (We don't launch a
|
||
|
|
// job if the output queue is full.)
|
||
|
|
|
||
|
|
// This happens when the next serial number to fetch is, eg, 50
|
||
|
|
// but jobs 51-100 have all executed really fast and appeared in
|
||
|
|
// the output queue before 50. A dispatch & check-results
|
||
|
|
// alternating loop can fail to find job 50 many times over until
|
||
|
|
// eventually the dispatch blocks before it arrives.
|
||
|
|
|
||
|
|
// Our solution is to dispatch in non-blocking mode so we are
|
||
|
|
// always to either dispatch or consume a result.
|
||
|
|
blk = hts_tpool_dispatch2(p, q, doit_square, ip, 1);
|
||
|
|
|
||
|
|
// Check for results.
|
||
|
|
if ((r = hts_tpool_next_result(q))) {
|
||
|
|
printf("RESULT: %d\n", *(int *)hts_tpool_result_data(r));
|
||
|
|
hts_tpool_delete_result(r, 1);
|
||
|
|
}
|
||
|
|
if (blk == -1) {
|
||
|
|
// The alternative is a separate thread for dispatching and/or
|
||
|
|
// consumption of results. See test_squareB.
|
||
|
|
putchar('.'); fflush(stdout);
|
||
|
|
usleep(10000);
|
||
|
|
}
|
||
|
|
} while (blk == -1);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Wait for any input-queued up jobs or in-progress jobs to complete.
|
||
|
|
hts_tpool_process_flush(q);
|
||
|
|
|
||
|
|
while ((r = hts_tpool_next_result(q))) {
|
||
|
|
printf("RESULT: %d\n", *(int *)hts_tpool_result_data(r));
|
||
|
|
hts_tpool_delete_result(r, 1);
|
||
|
|
}
|
||
|
|
|
||
|
|
hts_tpool_process_destroy(q);
|
||
|
|
hts_tpool_destroy(p);
|
||
|
|
|
||
|
|
return 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
/*-----------------------------------------------------------------------------
|
||
|
|
* Ordered x -> x*x test.
|
||
|
|
* Results arrive in numerical order.
|
||
|
|
*
|
||
|
|
* This implementation uses separate dispatching threads and job consumption
|
||
|
|
* threads (main thread). This means it can use a blocking calls for
|
||
|
|
* simplicity elsewhere.
|
||
|
|
*/
|
||
|
|
struct squareB_opt {
|
||
|
|
hts_tpool *p;
|
||
|
|
hts_tpool_process *q;
|
||
|
|
int n;
|
||
|
|
};
|
||
|
|
static void *test_squareB_dispatcher(void *arg) {
|
||
|
|
struct squareB_opt *o = (struct squareB_opt *)arg;
|
||
|
|
int i, *ip;
|
||
|
|
|
||
|
|
for (i = 0; i < o->n; i++) {
|
||
|
|
ip = malloc(sizeof(*ip));
|
||
|
|
*ip = i;
|
||
|
|
|
||
|
|
hts_tpool_dispatch(o->p, o->q, doit_square, ip);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Dispatch an sentinel job to mark the end
|
||
|
|
*(ip = malloc(sizeof(*ip))) = -1;
|
||
|
|
hts_tpool_dispatch(o->p, o->q, doit_square, ip);
|
||
|
|
pthread_exit(NULL);
|
||
|
|
}
|
||
|
|
|
||
|
|
int test_squareB(int n) {
|
||
|
|
hts_tpool *p = hts_tpool_init(n);
|
||
|
|
hts_tpool_process *q = hts_tpool_process_init(p, n*2, 0);
|
||
|
|
struct squareB_opt o = {p, q, TASK_SIZE};
|
||
|
|
pthread_t tid;
|
||
|
|
|
||
|
|
// Launch our job creation thread.
|
||
|
|
pthread_create(&tid, NULL, test_squareB_dispatcher, &o);
|
||
|
|
|
||
|
|
// Consume all results until we find the end-of-job marker.
|
||
|
|
for(;;) {
|
||
|
|
hts_tpool_result *r = hts_tpool_next_result_wait(q);
|
||
|
|
int x = *(int *)hts_tpool_result_data(r);
|
||
|
|
hts_tpool_delete_result(r, 1);
|
||
|
|
if (x == -1)
|
||
|
|
break;
|
||
|
|
printf("RESULT: %d\n", x);
|
||
|
|
}
|
||
|
|
|
||
|
|
// Wait for any input-queued up jobs or in-progress jobs to complete.
|
||
|
|
// This should do nothing as we've been executing until the termination
|
||
|
|
// marker of -1.
|
||
|
|
hts_tpool_process_flush(q);
|
||
|
|
assert(hts_tpool_next_result(q) == NULL);
|
||
|
|
|
||
|
|
hts_tpool_process_destroy(q);
|
||
|
|
hts_tpool_destroy(p);
|
||
|
|
pthread_join(tid, NULL);
|
||
|
|
|
||
|
|
return 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
/*-----------------------------------------------------------------------------
|
||
|
|
* A simple pipeline test.
|
||
|
|
* We use a dedicated input thread that does the initial generation of job
|
||
|
|
* and dispatch, several execution steps running in a shared pool, and a
|
||
|
|
* dedicated output thread that prints up the final result. It's key that our
|
||
|
|
* pipeline execution stages can run independently and don't themselves have
|
||
|
|
* any waits. To achieve this we therefore also use some dedicated threads
|
||
|
|
* that take the output from one queue and resubmits the job as the input to
|
||
|
|
* the next queue.
|
||
|
|
*
|
||
|
|
* More generally this could perhaps be a single pipeline thread that
|
||
|
|
* marshalls multiple queues and their interactions, but this is simply a
|
||
|
|
* demonstration of a single pipeline.
|
||
|
|
*
|
||
|
|
* Our process fills out the bottom byte of a 32-bit int and then shifts it
|
||
|
|
* left one byte at a time. Only the final stage needs to be ordered. Each
|
||
|
|
* stage uses its own queue.
|
||
|
|
*
|
||
|
|
* Possible improvement: we only need the last stage to be ordered. By
|
||
|
|
* allocating our own serial numbers for the first job and manually setting
|
||
|
|
* these serials in the last job, perhaps we can permit out of order execution
|
||
|
|
* of all the in-between stages. (I doubt it'll affect speed much though.)
|
||
|
|
*/
|
||
|
|
|
||
|
|
static void *pipe_input_thread(void *arg);
|
||
|
|
static void *pipe_stage1(void *arg);
|
||
|
|
static void *pipe_stage2(void *arg);
|
||
|
|
static void *pipe_stage3(void *arg);
|
||
|
|
static void *pipe_output_thread(void *arg);
|
||
|
|
|
||
|
|
typedef struct {
|
||
|
|
hts_tpool *p;
|
||
|
|
hts_tpool_process *q1;
|
||
|
|
hts_tpool_process *q2;
|
||
|
|
hts_tpool_process *q3;
|
||
|
|
int n;
|
||
|
|
} pipe_opt;
|
||
|
|
|
||
|
|
typedef struct {
|
||
|
|
pipe_opt *o;
|
||
|
|
unsigned int x;
|
||
|
|
int eof; // set with last job.
|
||
|
|
} pipe_job;
|
||
|
|
|
||
|
|
static void *pipe_input_thread(void *arg) {
|
||
|
|
pipe_opt *o = (pipe_opt *)arg;
|
||
|
|
|
||
|
|
int i;
|
||
|
|
for (i = 1; i <= o->n; i++) {
|
||
|
|
pipe_job *j = malloc(sizeof(*j));
|
||
|
|
j->o = o;
|
||
|
|
j->x = i;
|
||
|
|
j->eof = (i == o->n);
|
||
|
|
|
||
|
|
printf("I %08x\n", j->x);
|
||
|
|
|
||
|
|
if (hts_tpool_dispatch(o->p, o->q1, pipe_stage1, j) != 0) {
|
||
|
|
free(j);
|
||
|
|
pthread_exit((void *)1);
|
||
|
|
}
|
||
|
|
}
|
||
|
|
|
||
|
|
pthread_exit(NULL);
|
||
|
|
}
|
||
|
|
|
||
|
|
static void *pipe_stage1(void *arg) {
|
||
|
|
pipe_job *j = (pipe_job *)arg;
|
||
|
|
|
||
|
|
j->x <<= 8;
|
||
|
|
usleep(random() % 10000); // fast job
|
||
|
|
printf("1 %08x\n", j->x);
|
||
|
|
|
||
|
|
return j;
|
||
|
|
}
|
||
|
|
|
||
|
|
static void *pipe_stage1to2(void *arg) {
|
||
|
|
pipe_opt *o = (pipe_opt *)arg;
|
||
|
|
hts_tpool_result *r;
|
||
|
|
|
||
|
|
while ((r = hts_tpool_next_result_wait(o->q1))) {
|
||
|
|
pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
|
||
|
|
hts_tpool_delete_result(r, 0);
|
||
|
|
if (hts_tpool_dispatch(j->o->p, j->o->q2, pipe_stage2, j) != 0)
|
||
|
|
pthread_exit((void *)1);
|
||
|
|
if (j->eof)
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
|
||
|
|
pthread_exit(NULL);
|
||
|
|
}
|
||
|
|
|
||
|
|
static void *pipe_stage2(void *arg) {
|
||
|
|
pipe_job *j = (pipe_job *)arg;
|
||
|
|
|
||
|
|
j->x <<= 8;
|
||
|
|
usleep(random() % 100000); // slow job
|
||
|
|
printf("2 %08x\n", j->x);
|
||
|
|
|
||
|
|
return j;
|
||
|
|
}
|
||
|
|
|
||
|
|
static void *pipe_stage2to3(void *arg) {
|
||
|
|
pipe_opt *o = (pipe_opt *)arg;
|
||
|
|
hts_tpool_result *r;
|
||
|
|
|
||
|
|
while ((r = hts_tpool_next_result_wait(o->q2))) {
|
||
|
|
pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
|
||
|
|
hts_tpool_delete_result(r, 0);
|
||
|
|
if (hts_tpool_dispatch(j->o->p, j->o->q3, pipe_stage3, j) != 0)
|
||
|
|
pthread_exit((void *)1);
|
||
|
|
if (j->eof)
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
|
||
|
|
pthread_exit(NULL);
|
||
|
|
}
|
||
|
|
|
||
|
|
static void *pipe_stage3(void *arg) {
|
||
|
|
pipe_job *j = (pipe_job *)arg;
|
||
|
|
|
||
|
|
usleep(random() % 10000); // fast job
|
||
|
|
j->x <<= 8;
|
||
|
|
return j;
|
||
|
|
}
|
||
|
|
|
||
|
|
static void *pipe_output_thread(void *arg) {
|
||
|
|
pipe_opt *o = (pipe_opt *)arg;
|
||
|
|
hts_tpool_result *r;
|
||
|
|
|
||
|
|
while ((r = hts_tpool_next_result_wait(o->q3))) {
|
||
|
|
pipe_job *j = (pipe_job *)hts_tpool_result_data(r);
|
||
|
|
int eof = j->eof;
|
||
|
|
printf("O %08x\n", j->x);
|
||
|
|
hts_tpool_delete_result(r, 1);
|
||
|
|
if (eof)
|
||
|
|
break;
|
||
|
|
}
|
||
|
|
|
||
|
|
pthread_exit(NULL);
|
||
|
|
}
|
||
|
|
|
||
|
|
int test_pipe(int n) {
|
||
|
|
hts_tpool *p = hts_tpool_init(n);
|
||
|
|
hts_tpool_process *q1 = hts_tpool_process_init(p, n*2, 0);
|
||
|
|
hts_tpool_process *q2 = hts_tpool_process_init(p, n*2, 0);
|
||
|
|
hts_tpool_process *q3 = hts_tpool_process_init(p, n*2, 0);
|
||
|
|
pipe_opt o = {p, q1, q2, q3, TASK_SIZE};
|
||
|
|
pthread_t tidIto1, tid1to2, tid2to3, tid3toO;
|
||
|
|
void *retv;
|
||
|
|
int ret;
|
||
|
|
|
||
|
|
// Launch our data source and sink threads.
|
||
|
|
pthread_create(&tidIto1, NULL, pipe_input_thread, &o);
|
||
|
|
pthread_create(&tid1to2, NULL, pipe_stage1to2, &o);
|
||
|
|
pthread_create(&tid2to3, NULL, pipe_stage2to3, &o);
|
||
|
|
pthread_create(&tid3toO, NULL, pipe_output_thread, &o);
|
||
|
|
|
||
|
|
// Wait for tasks to finish.
|
||
|
|
ret = 0;
|
||
|
|
pthread_join(tidIto1, &retv); ret |= (retv != NULL);
|
||
|
|
pthread_join(tid1to2, &retv); ret |= (retv != NULL);
|
||
|
|
pthread_join(tid2to3, &retv); ret |= (retv != NULL);
|
||
|
|
pthread_join(tid3toO, &retv); ret |= (retv != NULL);
|
||
|
|
printf("Return value %d\n", ret);
|
||
|
|
|
||
|
|
hts_tpool_process_destroy(q1);
|
||
|
|
hts_tpool_process_destroy(q2);
|
||
|
|
hts_tpool_process_destroy(q3);
|
||
|
|
hts_tpool_destroy(p);
|
||
|
|
|
||
|
|
return 0;
|
||
|
|
}
|
||
|
|
|
||
|
|
/*-----------------------------------------------------------------------------*/
|
||
|
|
int main(int argc, char **argv) {
|
||
|
|
int n;
|
||
|
|
srandom(0);
|
||
|
|
|
||
|
|
if (argc < 3) {
|
||
|
|
fprintf(stderr, "Usage: %s command n_threads\n", argv[0]);
|
||
|
|
fprintf(stderr, "Where commands are:\n\n");
|
||
|
|
fprintf(stderr, "unordered # Unordered output\n");
|
||
|
|
fprintf(stderr, "ordered1 # Main thread with non-block API\n");
|
||
|
|
fprintf(stderr, "ordered2 # Dispatch thread, blocking API\n");
|
||
|
|
fprintf(stderr, "pipe # Multi-stage pipeline, several queues\n");
|
||
|
|
exit(1);
|
||
|
|
}
|
||
|
|
|
||
|
|
n = atoi(argv[2]);
|
||
|
|
if (strcmp(argv[1], "unordered") == 0) return test_square_u(n);
|
||
|
|
if (strcmp(argv[1], "ordered1") == 0) return test_square(n);
|
||
|
|
if (strcmp(argv[1], "ordered2") == 0) return test_squareB(n);
|
||
|
|
if (strcmp(argv[1], "pipe") == 0) return test_pipe(n);
|
||
|
|
|
||
|
|
fprintf(stderr, "Unknown sub-command\n");
|
||
|
|
exit(1);
|
||
|
|
}
|
||
|
|
#endif
|