Thread pools: keep waiting tasks mutex in ngx_thread_pool_t.
It's not needed for completed tasks queue since the previous change. No functional changes.
This commit is contained in:
parent
14b5f6e9c9
commit
3d8c16055f
1 changed files with 20 additions and 45 deletions
|
@ -17,13 +17,17 @@ typedef struct {
|
|||
|
||||
|
||||
typedef struct {
|
||||
ngx_thread_mutex_t mtx;
|
||||
ngx_thread_task_t *first;
|
||||
ngx_thread_task_t **last;
|
||||
} ngx_thread_pool_queue_t;
|
||||
|
||||
#define ngx_thread_pool_queue_init(q) \
|
||||
(q)->first = NULL; \
|
||||
(q)->last = &(q)->first
|
||||
|
||||
|
||||
struct ngx_thread_pool_s {
|
||||
ngx_thread_mutex_t mtx;
|
||||
ngx_thread_pool_queue_t queue;
|
||||
ngx_int_t waiting;
|
||||
ngx_thread_cond_t cond;
|
||||
|
@ -42,10 +46,6 @@ struct ngx_thread_pool_s {
|
|||
|
||||
static ngx_int_t ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log,
|
||||
ngx_pool_t *pool);
|
||||
static ngx_int_t ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue,
|
||||
ngx_log_t *log);
|
||||
static ngx_int_t ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue,
|
||||
ngx_log_t *log);
|
||||
static void ngx_thread_pool_destroy(ngx_thread_pool_t *tp);
|
||||
|
||||
static void *ngx_thread_pool_cycle(void *data);
|
||||
|
@ -117,12 +117,14 @@ ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
|
|||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ngx_thread_pool_queue_init(&tp->queue, log) != NGX_OK) {
|
||||
ngx_thread_pool_queue_init(&tp->queue);
|
||||
|
||||
if (ngx_thread_mutex_create(&tp->mtx, log) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ngx_thread_cond_create(&tp->cond, log) != NGX_OK) {
|
||||
(void) ngx_thread_pool_queue_destroy(&tp->queue, log);
|
||||
(void) ngx_thread_mutex_destroy(&tp->mtx, log);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
|
@ -160,27 +162,6 @@ ngx_thread_pool_init(ngx_thread_pool_t *tp, ngx_log_t *log, ngx_pool_t *pool)
|
|||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_thread_pool_queue_init(ngx_thread_pool_queue_t *queue, ngx_log_t *log)
|
||||
{
|
||||
queue->first = NULL;
|
||||
queue->last = &queue->first;
|
||||
|
||||
return ngx_thread_mutex_create(&queue->mtx, log);
|
||||
}
|
||||
|
||||
|
||||
static ngx_int_t
|
||||
ngx_thread_pool_queue_destroy(ngx_thread_pool_queue_t *queue, ngx_log_t *log)
|
||||
{
|
||||
#if 0
|
||||
return ngx_thread_mutex_destroy(&queue->mtx, log);
|
||||
#else
|
||||
return NGX_OK;
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
static void
|
||||
ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
|
||||
{
|
||||
|
@ -188,9 +169,9 @@ ngx_thread_pool_destroy(ngx_thread_pool_t *tp)
|
|||
|
||||
#if 0
|
||||
(void) ngx_thread_cond_destroy(&tp->cond, tp->log);
|
||||
#endif
|
||||
|
||||
(void) ngx_thread_pool_queue_destroy(&tp->queue, tp->log);
|
||||
(void) ngx_thread_mutex_destroy(&tp->mtx, tp->log);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
||||
|
@ -219,12 +200,12 @@ ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
|
|||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) {
|
||||
if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
if (tp->waiting >= tp->max_queue) {
|
||||
(void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
|
||||
(void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
|
||||
|
||||
ngx_log_error(NGX_LOG_ERR, tp->log, 0,
|
||||
"thread pool \"%V\" queue overflow: %i tasks waiting",
|
||||
|
@ -238,7 +219,7 @@ ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
|
|||
task->next = NULL;
|
||||
|
||||
if (ngx_thread_cond_signal(&tp->cond, tp->log) != NGX_OK) {
|
||||
(void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
|
||||
(void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
|
||||
return NGX_ERROR;
|
||||
}
|
||||
|
||||
|
@ -247,7 +228,7 @@ ngx_thread_task_post(ngx_thread_pool_t *tp, ngx_thread_task_t *task)
|
|||
|
||||
tp->waiting++;
|
||||
|
||||
(void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
|
||||
(void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
|
||||
|
||||
ngx_log_debug2(NGX_LOG_DEBUG_CORE, tp->log, 0,
|
||||
"task #%ui added to thread pool \"%V\"",
|
||||
|
@ -287,7 +268,7 @@ ngx_thread_pool_cycle(void *data)
|
|||
}
|
||||
|
||||
for ( ;; ) {
|
||||
if (ngx_thread_mutex_lock(&tp->queue.mtx, tp->log) != NGX_OK) {
|
||||
if (ngx_thread_mutex_lock(&tp->mtx, tp->log) != NGX_OK) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -295,10 +276,10 @@ ngx_thread_pool_cycle(void *data)
|
|||
tp->waiting--;
|
||||
|
||||
while (tp->queue.first == NULL) {
|
||||
if (ngx_thread_cond_wait(&tp->cond, &tp->queue.mtx, tp->log)
|
||||
if (ngx_thread_cond_wait(&tp->cond, &tp->mtx, tp->log)
|
||||
!= NGX_OK)
|
||||
{
|
||||
(void) ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log);
|
||||
(void) ngx_thread_mutex_unlock(&tp->mtx, tp->log);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
@ -310,7 +291,7 @@ ngx_thread_pool_cycle(void *data)
|
|||
tp->queue.last = &tp->queue.first;
|
||||
}
|
||||
|
||||
if (ngx_thread_mutex_unlock(&tp->queue.mtx, tp->log) != NGX_OK) {
|
||||
if (ngx_thread_mutex_unlock(&tp->mtx, tp->log) != NGX_OK) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -578,11 +559,7 @@ ngx_thread_pool_init_worker(ngx_cycle_t *cycle)
|
|||
return NGX_OK;
|
||||
}
|
||||
|
||||
if (ngx_thread_pool_queue_init(&ngx_thread_pool_done, cycle->log)
|
||||
!= NGX_OK)
|
||||
{
|
||||
return NGX_ERROR;
|
||||
}
|
||||
ngx_thread_pool_queue_init(&ngx_thread_pool_done);
|
||||
|
||||
tpp = tcf->pools.elts;
|
||||
|
||||
|
@ -621,6 +598,4 @@ ngx_thread_pool_exit_worker(ngx_cycle_t *cycle)
|
|||
for (i = 0; i < tcf->pools.nelts; i++) {
|
||||
ngx_thread_pool_destroy(tpp[i]);
|
||||
}
|
||||
|
||||
(void) ngx_thread_pool_queue_destroy(&ngx_thread_pool_done, cycle->log);
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue