nginx-0.0.7-2004-07-05-10:55:54 import

This commit is contained in:
Igor Sysoev 2004-07-05 06:55:54 +00:00
parent 039459ed8c
commit 0c6c115f27
19 changed files with 789 additions and 480 deletions

View file

@ -65,7 +65,7 @@ else
ngx_lib="rsaref md library"
ngx_lib_test="MD5_CTX md5; MD5Init(&md5)"
#ngx_libs=-lmd
ngx_libs=-lmd
. auto/lib/test
fi

View file

@ -54,11 +54,29 @@ if [ \( $version -lt 500000 -a $version -ge 430000 \) \
-o $version -ge 500018 ]
then
echo " + using kqueue's NOTE_LOWAT"
have=HAVE_LOWAT_EVENT . auto/have
fi
if [ $USE_THREADS = "rfork" ]; then
echo " + using rfork()"
# kqueue's EVFILT_SIGNAL is safe
if [ $version -gt 460101 ]; then
echo " + kqueue's EVFILT_SIGNAL is safe"
have=HAVE_SAFE_EVFILT_SIGNAL . auto/have
else
echo "$0: error: the kqueue's EVFILT_SIGNAL is unsafe on this"
echo "FreeBSD version, so --with-threads=rfork could not be used"
echo
exit 1
fi
fi
if [ $EVENT_AIO = YES ]; then
have=HAVE_AIO . auto/have
EVENT_MODULES="$EVENT_MODULES $AIO_MODULE"

View file

@ -135,9 +135,12 @@ POSIX_DEPS=src/os/unix/ngx_posix_config.h
FREEBSD_DEPS=src/os/unix/ngx_freebsd_config.h
FREEBSD_SRCS=src/os/unix/ngx_freebsd_init.c
FREEBSD_SENDFILE_SRCS=src/os/unix/ngx_freebsd_sendfile_chain.c
FREEBSD_RFORK_DEPS="src/os/unix/ngx_freebsd_rfork_thread.h"
FREEBSD_RFORK_SRCS="src/os/unix/ngx_freebsd_rfork_thread.c"
FREEBSD_RFORK_THREAD_SRCS="src/os/unix/rfork_thread.S"
PTHREAD_SRCS="src/os/unix/ngx_pthread_thread.c"
LINUX_DEPS=src/os/unix/ngx_linux_config.h
LINUX_SRCS=src/os/unix/ngx_linux_init.c
LINUX_SENDFILE_SRCS=src/os/unix/ngx_linux_sendfile_chain.c

View file

@ -1,13 +1,51 @@
if [ $USE_THREADS = "rfork" ]; then
case $USE_THREADS in
rfork)
have=NGX_THREADS . auto/have
have=NGX_USE_RFORK . auto/have
CORE_DEPS="$CORE_DEPS $FREEBSD_RFORK_DEPS"
CORE_SRCS="$CORE_SRCS $FREEBSD_RFORK_SRCS"
have=NGX_THREADS . auto/have
have=NGX_USE_RFORK . auto/have
CORE_DEPS="$CORE_DEPS $UNIX_THREADS_DEPS"
CORE_SRCS="$CORE_SRCS $FREEBSD_RFORK_SRCS"
case $PLATFORM in
*:i386)
if [ \( $version -gt 500000 -a $version -lt 501000 \) \
-o $version -lt 491000 ]
then
CORE_SRCS="$CORE_SRCS $FREEBSD_RFORK_THREAD_SRCS"
fi
;;
esac
;;
if [ $version -lt 501000 ]; then
CORE_SRCS="$CORE_SRCS $FREEBSD_RFORK_THREAD_SRCS"
fi
pthread)
have=NGX_THREADS . auto/have
CORE_SRCS="$CORE_SRCS $PTHREAD_SRCS"
CORE_LIBS="$CORE_LIBS -lpthread"
;;
fi
freebsd4)
have=NGX_THREADS . auto/have
CFLAGS="$CFLAGS -pthread"
CORE_SRCS="$CORE_SRCS $PTHREAD_SRCS"
CORE_LIBS="$CORE_LIBS -pthread"
;;
lc_r)
have=NGX_THREADS . auto/have
CORE_SRCS="$CORE_SRCS $PTHREAD_SRCS"
CORE_LIBS="$CORE_LIBS -lc_r"
;;
lthr)
have=NGX_THREADS . auto/have
CORE_SRCS="$CORE_SRCS $PTHREAD_SRCS"
CORE_LIBS="$CORE_LIBS -lthr"
;;
lkse)
have=NGX_THREADS . auto/have
CORE_SRCS="$CORE_SRCS $PTHREAD_SRCS"
CORE_LIBS="$CORE_LIBS -lkse"
;;
esac

View file

@ -35,6 +35,24 @@ static ngx_command_t ngx_core_commands[] = {
offsetof(ngx_core_conf_t, worker_processes),
NULL },
#if (NGX_THREADS)
{ ngx_string("worker_threads"),
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_num_slot,
0,
offsetof(ngx_core_conf_t, worker_threads),
NULL },
{ ngx_string("thread_stack_size"),
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE1,
ngx_conf_set_size_slot,
0,
offsetof(ngx_core_conf_t, thread_stack_size),
NULL },
#endif
{ ngx_string("user"),
NGX_MAIN_CONF|NGX_DIRECT_CONF|NGX_CONF_TAKE12,
ngx_set_user,
@ -106,12 +124,6 @@ int main(int argc, char *const *argv)
ctx.argc = argc;
ctx.argv = argv;
#if (NGX_THREADS)
if (ngx_time_mutex_init(log) == NGX_ERROR) {
return 1;
}
#endif
if (ngx_getopt(&ctx, &init_cycle) == NGX_ERROR) {
return 1;
}
@ -341,6 +353,10 @@ static void *ngx_core_module_create_conf(ngx_cycle_t *cycle)
ccf->daemon = NGX_CONF_UNSET;
ccf->master = NGX_CONF_UNSET;
ccf->worker_processes = NGX_CONF_UNSET;
#if (NGX_THREADS)
ccf->worker_threads = NGX_CONF_UNSET;
ccf->thread_stack_size = NGX_CONF_UNSET;
#endif
ccf->user = (ngx_uid_t) NGX_CONF_UNSET;
ccf->group = (ngx_gid_t) NGX_CONF_UNSET;
@ -356,6 +372,12 @@ static char *ngx_core_module_init_conf(ngx_cycle_t *cycle, void *conf)
ngx_conf_init_value(ccf->master, 1);
ngx_conf_init_value(ccf->worker_processes, 1);
#if (NGX_THREADS)
ngx_conf_init_value(ccf->worker_threads, 0);
ngx_threads_n = ccf->worker_threads;
ngx_conf_init_size_value(ccf->thread_stack_size, 2 * 1024 * 1024);
#endif
#if !(WIN32)
/* TODO: default "nobody" user */

View file

@ -40,6 +40,12 @@ typedef struct {
ngx_str_t pid;
ngx_str_t newpid;
#if (NGX_THREADS)
ngx_int_t worker_threads;
size_t thread_stack_size;
#endif
} ngx_core_conf_t;

View file

@ -279,9 +279,9 @@ static ngx_int_t ngx_kqueue_del_event(ngx_event_t *ev, int event, u_int flags)
static ngx_int_t ngx_kqueue_set_event(ngx_event_t *ev, int filter, u_int flags)
{
struct kevent *kev, kv;
struct timespec ts;
ngx_connection_t *c;
struct kevent *kev, kv;
c = ev->data;
@ -370,7 +370,7 @@ static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle)
for ( ;; ) {
timer = ngx_event_find_timer();
#if (NGX_THREADS)
#if (NGX_THREADS0)
if (timer == NGX_TIMER_ERROR) {
return NGX_ERROR;
}
@ -621,7 +621,7 @@ static ngx_int_t ngx_kqueue_process_events(ngx_cycle_t *cycle)
if (ngx_posted_events) {
if (ngx_threaded) {
ngx_cond_signal(ngx_posted_events_cv);
ngx_wakeup_worker_thread(cycle);
} else {
ngx_event_process_posted(cycle);

View file

@ -91,7 +91,7 @@ ngx_module_t ngx_events_module = {
ngx_events_commands, /* module directives */
NGX_CORE_MODULE, /* module type */
NULL, /* init module */
NULL /* init child */
NULL /* init process */
};

View file

@ -8,7 +8,6 @@ ngx_thread_volatile ngx_event_t *ngx_posted_events;
#if (NGX_THREADS)
ngx_mutex_t *ngx_posted_events_mutex;
ngx_cond_t *ngx_posted_events_cv;
#endif
@ -57,6 +56,19 @@ void ngx_event_process_posted(ngx_cycle_t *cycle)
#if (NGX_THREADS)
void ngx_wakeup_worker_thread(ngx_cycle_t *cycle)
{
ngx_int_t i;
for (i = 0; i < ngx_threads_n; i++) {
if (ngx_threads[i].state == NGX_THREAD_FREE) {
ngx_cond_signal(ngx_threads[i].cv);
return;
}
}
}
ngx_int_t ngx_event_thread_process_posted(ngx_cycle_t *cycle)
{
ngx_event_t *ev;
@ -71,7 +83,6 @@ ngx_int_t ngx_event_thread_process_posted(ngx_cycle_t *cycle)
"posted event " PTR_FMT, ev);
if (ev == NULL) {
ngx_mutex_unlock(ngx_posted_events_mutex);
return NGX_OK;
}
@ -144,4 +155,10 @@ ngx_int_t ngx_event_thread_process_posted(ngx_cycle_t *cycle)
}
}
#else
void ngx_wakeup_worker_thread(ngx_cycle_t *cycle)
{
}
#endif

View file

@ -26,6 +26,7 @@
void ngx_event_process_posted(ngx_cycle_t *cycle);
void ngx_wakeup_worker_thread(ngx_cycle_t *cycle);
extern ngx_thread_volatile ngx_event_t *ngx_posted_events;
@ -34,7 +35,6 @@ extern ngx_thread_volatile ngx_event_t *ngx_posted_events;
ngx_int_t ngx_event_thread_process_posted(ngx_cycle_t *cycle);
extern ngx_mutex_t *ngx_posted_events_mutex;
extern ngx_cond_t *ngx_posted_events_cv;
#endif

View file

@ -10,18 +10,20 @@
/*
* The threads implementation uses the rfork(RFPROC|RFTHREAD|RFMEM) syscall
* to create threads. All threads use the stacks of the same size mmap()ed
* below the main stack. Thus the current thread id is determinated through
* the stack pointer.
* below the main stack. Thus the current thread id is determinated via
* the stack pointer value.
*
* The mutex implementation uses the ngx_atomic_cmp_set() operation
* to acquire a mutex and the SysV semaphore to wait on a mutex or to wake up
* to acquire a mutex and the SysV semaphore to wait on a mutex and to wake up
* the waiting threads. The light mutex does not use semaphore, so after
* spinning in the lock the thread calls sched_yield(). However the light
* mutecies are intended to be used with the "trylock" operation only.
* The SysV semop() is a cheap syscall, particularly if it has little sembuf's
* and does not use SEM_UNDO.
*
* The condition variable implementation uses the SysV semaphore set of two
* semaphores. The first is used by the CV mutex, and the second is used
* by the CV to signal.
* The condition variable implementation uses signal #64. The signal handler
* is SIG_IGN so the kill() is a cheap syscall. The thread waits a signal
* in kevent(). The use of the EVFILT_SIGNAL is safe since FreeBSD 4.7.
*
* This threads implementation currently works on i386 (486+) and amd64
* platforms only.
@ -76,7 +78,7 @@ void _spinlock(ngx_atomic_t *lock)
for ( ;; ) {
if (*lock) {
if (ngx_freebsd_hw_ncpu > 1 && tries++ < 1000) {
if (ngx_ncpu > 1 && tries++ < 1000) {
continue;
}
@ -110,7 +112,7 @@ void _spinunlock(ngx_atomic_t *lock)
#endif
int ngx_create_thread(ngx_tid_t *tid, int (*func)(void *arg), void *arg,
int ngx_create_thread(ngx_tid_t *tid, void* (*func)(void *arg), void *arg,
ngx_log_t *log)
{
int id, err;
@ -144,15 +146,10 @@ int ngx_create_thread(ngx_tid_t *tid, int (*func)(void *arg), void *arg,
ngx_log_debug2(NGX_LOG_DEBUG_CORE, log, 0,
"thread stack: " PTR_FMT "-" PTR_FMT, stack, stack_top);
#if 1
id = rfork_thread(RFPROC|RFTHREAD|RFMEM, stack_top, func, arg);
#elif 1
id = rfork_thread(RFPROC|RFMEM, stack_top, func, arg);
#elif 1
id = rfork_thread(RFFDG|RFCFDG, stack_top, func, arg);
#else
id = rfork(RFFDG|RFCFDG);
#endif
ngx_set_errno(0);
id = rfork_thread(RFPROC|RFTHREAD|RFMEM, stack_top,
(ngx_rfork_thread_func_pt) func, arg);
err = ngx_errno;
@ -174,10 +171,23 @@ int ngx_create_thread(ngx_tid_t *tid, int (*func)(void *arg), void *arg,
ngx_int_t ngx_init_threads(int n, size_t size, ngx_cycle_t *cycle)
{
size_t len;
char *red_zone, *zone;
char *red_zone, *zone;
size_t len;
ngx_int_t i;
struct sigaction sa;
max_threads = n;
max_threads = n + 1;
for (i = 0; i < n; i++) {
ngx_memzero(&sa, sizeof(struct sigaction));
sa.sa_handler = SIG_IGN;
sigemptyset(&sa.sa_mask);
if (sigaction(NGX_CV_SIGNAL, &sa, NULL) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"sigaction(%d, SIG_IGN) failed", NGX_CV_SIGNAL);
return NGX_ERROR;
}
}
len = sizeof(ngx_freebsd_kern_usrstack);
if (sysctlbyname("kern.usrstack", &ngx_freebsd_kern_usrstack, &len,
@ -249,14 +259,6 @@ ngx_tid_t ngx_thread_self()
return ngx_pid;
}
#if 0
if (tids[tid] == 0) {
pid = ngx_pid;
tids[tid] = pid;
return pid;
}
#endif
return tids[tid];
}
@ -301,7 +303,7 @@ ngx_mutex_t *ngx_mutex_init(ngx_log_t *log, uint flags)
}
void ngx_mutex_done(ngx_mutex_t *m)
void ngx_mutex_destroy(ngx_mutex_t *m)
{
if (semctl(m->semid, 0, IPC_RMID) == -1) {
ngx_log_error(NGX_LOG_ALERT, m->log, ngx_errno,
@ -538,43 +540,26 @@ ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m)
ngx_cond_t *ngx_cond_init(ngx_log_t *log)
{
ngx_cond_t *cv;
union semun op;
ngx_cond_t *cv;
if (!(cv = ngx_alloc(sizeof(ngx_cond_t), log))) {
return NULL;
}
cv->signo = NGX_CV_SIGNAL;
cv->tid = 0;
cv->log = log;
cv->semid = semget(IPC_PRIVATE, 2, SEM_R|SEM_A);
if (cv->semid == -1) {
ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "semget() failed");
return NULL;
}
op.val = 0;
if (semctl(cv->semid, 0, SETVAL, op) == -1) {
ngx_log_error(NGX_LOG_ALERT, log, ngx_errno, "semctl(SETVAL) failed");
if (semctl(cv->semid, 0, IPC_RMID) == -1) {
ngx_log_error(NGX_LOG_ALERT, log, ngx_errno,
"semctl(IPC_RMID) failed");
}
return NULL;
}
cv->kq = -1;
return cv;
}
void ngx_cond_done(ngx_cond_t *cv)
void ngx_cond_destroy(ngx_cond_t *cv)
{
if (semctl(cv->semid, 0, IPC_RMID) == -1) {
if (close(cv->kq) == -1) {
ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno,
"semctl(IPC_RMID) failed");
"kqueue close() failed");
}
ngx_free(cv);
@ -583,21 +568,101 @@ void ngx_cond_done(ngx_cond_t *cv)
ngx_int_t ngx_cond_wait(ngx_cond_t *cv, ngx_mutex_t *m)
{
struct sembuf op;
int n;
ngx_err_t err;
struct kevent kev;
struct timespec ts;
ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
"cv " PTR_FMT " wait", cv);
if (cv->kq == -1) {
op.sem_num = 0;
op.sem_op = -1;
op.sem_flg = 0;
/*
* We have to add the EVFILT_SIGNAL filter in the rfork()ed thread.
* Otherwise the thread would not get a signal event.
*
* However, we have not to open the kqueue in the thread,
* it is simply handy do it together.
*/
if (semop(cv->semid, &op, 1) == -1) {
ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno,
"semop() failed while waiting on cv " PTR_FMT, cv);
cv->kq = kqueue();
if (cv->kq == -1) {
ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno, "kqueue() failed");
return NGX_ERROR;
}
ngx_log_debug2(NGX_LOG_DEBUG_CORE, cv->log, 0,
"cv kq:%d signo:%d", cv->kq, cv->signo);
kev.ident = cv->signo;
kev.filter = EVFILT_SIGNAL;
kev.flags = EV_ADD;
kev.fflags = 0;
kev.data = 0;
kev.udata = NULL;
ts.tv_sec = 0;
ts.tv_nsec = 0;
if (kevent(cv->kq, &kev, 1, NULL, 0, &ts) == -1) {
ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno, "kevent() failed");
return NGX_ERROR;
}
}
if (ngx_mutex_unlock(m) == NGX_ERROR) {
return NGX_ERROR;
}
ngx_log_debug3(NGX_LOG_DEBUG_CORE, cv->log, 0,
"cv " PTR_FMT " wait, kq:%d, signo:%d",
cv, cv->kq, cv->signo);
for ( ;; ) {
n = kevent(cv->kq, NULL, 0, &kev, 1, NULL);
ngx_log_debug2(NGX_LOG_DEBUG_CORE, cv->log, 0,
"cv " PTR_FMT " kevent: %d", cv, n);
if (n == -1) {
err = ngx_errno;
ngx_log_error((err == NGX_EINTR) ? NGX_LOG_INFO : NGX_LOG_ALERT,
cv->log, ngx_errno,
"kevent() failed while waiting condition variable "
PTR_FMT, cv);
if (err == NGX_EINTR) {
break;
}
return NGX_ERROR;
}
if (n == 0) {
ngx_log_error(NGX_LOG_ALERT, cv->log, 0,
"kevent() returned no events "
"while waiting condition variable " PTR_FMT,
cv);
continue;
}
if (kev.filter != EVFILT_SIGNAL) {
ngx_log_error(NGX_LOG_ALERT, cv->log, 0,
"kevent() returned unexpected events: %d "
"while waiting condition variable " PTR_FMT,
kev.filter, cv);
continue;
}
if (kev.ident != (uintptr_t) cv->signo) {
ngx_log_error(NGX_LOG_ALERT, cv->log, 0,
"kevent() returned unexpected signal: %d ",
"while waiting condition variable " PTR_FMT,
kev.ident, cv);
continue;
}
break;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
"cv " PTR_FMT " is waked up", cv);
@ -611,18 +676,14 @@ ngx_int_t ngx_cond_wait(ngx_cond_t *cv, ngx_mutex_t *m)
ngx_int_t ngx_cond_signal(ngx_cond_t *cv)
{
struct sembuf op;
ngx_log_debug3(NGX_LOG_DEBUG_CORE, cv->log, 0,
"cv " PTR_FMT " to signal " PID_T_FMT " %d",
cv, cv->tid, cv->signo);
ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
"cv " PTR_FMT " to signal", cv);
op.sem_num = 0;
op.sem_op = 1;
op.sem_flg = 0;
if (semop(cv->semid, &op, 1) == -1) {
if (kill(cv->tid, cv->signo) == -1) {
ngx_log_error(NGX_LOG_ALERT, cv->log, ngx_errno,
"semop() failed while signaling cv " PTR_FMT, cv);
"kill() failed while signaling condition variable "
PTR_FMT, cv);
return NGX_ERROR;
}

View file

@ -0,0 +1,93 @@
#ifndef _NGX_FREEBSD_RFORK_THREAD_H_INCLUDED_
#define _NGX_FREEBSD_RFORK_THREAD_H_INCLUDED_
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sched.h>
typedef pid_t ngx_tid_t;
#undef ngx_log_pid
#define ngx_log_pid ngx_thread_self()
#define ngx_log_tid 0
#define TID_T_FMT PID_T_FMT
#define NGX_MUTEX_LIGHT 1
#define NGX_MUTEX_LOCK_BUSY 0x80000000
typedef volatile struct {
ngx_atomic_t lock;
ngx_log_t *log;
int semid;
} ngx_mutex_t;
#define NGX_CV_SIGNAL 64
typedef struct {
int signo;
int kq;
ngx_tid_t tid;
ngx_log_t *log;
} ngx_cond_t;
#define ngx_thread_sigmask(how, set, oset) \
(sigprocmask(how, set, oset) == -1) ? ngx_errno : 0
#define ngx_thread_sigmask_n "sigprocmask()"
#define ngx_thread_join(t, p)
#define ngx_setthrtitle(n) setproctitle(n)
extern char *ngx_freebsd_kern_usrstack;
extern size_t ngx_thread_stack_size;
static inline int ngx_gettid()
{
char *sp;
if (ngx_thread_stack_size == 0) {
return 0;
}
#if ( __i386__ )
__asm__ volatile ("mov %%esp, %0" : "=q" (sp));
#elif ( __amd64__ )
__asm__ volatile ("mov %%rsp, %0" : "=q" (sp));
#else
#error "rfork()ed threads are not supported on this platform"
#endif
return (ngx_freebsd_kern_usrstack - sp) / ngx_thread_stack_size;
}
ngx_tid_t ngx_thread_self();
#define ngx_thread_main() (ngx_gettid() == 0)
#define ngx_mutex_trylock(m) ngx_mutex_dolock(m, 1)
#define ngx_mutex_lock(m) ngx_mutex_dolock(m, 0)
ngx_int_t ngx_mutex_dolock(ngx_mutex_t *m, ngx_int_t try);
ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m);
typedef int (*ngx_rfork_thread_func_pt)(void *arg);
#endif /* _NGX_FREEBSD_RFORK_THREAD_H_INCLUDED_ */

View file

@ -1,231 +0,0 @@
void ngx_posix_master_cycle(ngx_cycle_t *cycle)
{
static ngx_int_t sent;
static ngx_msec_t delay = 125;
if (ngx_process == NGX_PROCESS_MASTER) {
if (sent) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"sent signal cycle");
if (sigprocmask(SIG_UNBLOCK, &set, NULL) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"sigprocmask() failed");
continue;
}
/*
* there is very big chance that the pending signals
* would be delivered right on the sigprocmask() return
*/
if (!ngx_signal) {
if (delay < 15000) {
delay *= 2;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"msleep %d", delay);
ngx_msleep(delay);
ngx_gettimeofday(&tv);
ngx_time_update(tv.tv_sec);
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"wake up");
}
if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"sigprocmask() failed");
}
ngx_signal = 0;
} else {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"sigsuspend");
sigsuspend(&wset);
ngx_gettimeofday(&tv);
ngx_time_update(tv.tv_sec);
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"wake up");
}
} else { /* NGX_PROCESS_SINGLE */
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"worker cycle");
ngx_process_events(cycle->log);
}
if (ngx_reap) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"reap childs");
live = 0;
for (i = 0; i < ngx_last_process; i++) {
ngx_log_debug6(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"child: " PID_T_FMT
" s:%d e:%d t:%d d:%d r:%d",
ngx_processes[i].pid,
ngx_processes[i].signal,
ngx_processes[i].exiting,
ngx_processes[i].exited,
ngx_processes[i].detached,
ngx_processes[i].respawn);
if (ngx_processes[i].exited) {
if (ngx_processes[i].respawn
&& !ngx_processes[i].exiting
&& !ngx_terminate
&& !ngx_quit)
{
if (ngx_spawn_process(cycle,
ngx_processes[i].proc,
ngx_processes[i].data,
ngx_processes[i].name, i)
== NGX_ERROR)
{
ngx_log_error(NGX_LOG_ALERT, cycle->log, 0,
"can not respawn %s",
ngx_processes[i].name);
}
continue;
}
if (ngx_processes[i].pid == ngx_new_binary) {
ngx_new_binary = 0;
}
if (i != --ngx_last_process) {
ngx_processes[i--] =
ngx_processes[ngx_last_process];
}
} else if (!ngx_processes[i].detached
&& (ngx_terminate || ngx_quit))
{
live = 1;
} else if (ngx_processes[i].exiting) {
live = 1;
}
}
if (!live) {
if (ngx_terminate || ngx_quit) {
if (ngx_inherited && getppid() > 1) {
name = ctx->pid.name.data;
} else {
name = ctx->name;
}
if (ngx_delete_file(name) == NGX_FILE_ERROR) {
ngx_log_error(NGX_LOG_ALERT, cycle->log,
ngx_errno,
ngx_delete_file_n
" \"%s\" failed", name);
}
ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exit");
exit(0);
} else {
sent = 0;
}
}
}
if (ngx_terminate) {
if (delay > 10000) {
signo = SIGKILL;
} else {
signo = ngx_signal_value(NGX_TERMINATE_SIGNAL);
}
} else if (ngx_quit) {
signo = ngx_signal_value(NGX_SHUTDOWN_SIGNAL);
} else {
if (ngx_noaccept) {
signo = ngx_signal_value(NGX_SHUTDOWN_SIGNAL);
}
if (ngx_change_binary) {
ngx_change_binary = 0;
ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "changing binary");
ngx_new_binary = ngx_exec_new_binary(cycle, ctx->argv);
}
if (ngx_reconfigure) {
signo = ngx_signal_value(NGX_SHUTDOWN_SIGNAL);
ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "reconfiguring");
}
if (ngx_reopen) {
/* STUB */
signo = ngx_signal_value(NGX_SHUTDOWN_SIGNAL);
ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "reopening logs");
ngx_reopen_files(cycle);
}
}
if (signo) {
for (i = 0; i < ngx_last_process; i++) {
if (!ngx_processes[i].detached) {
ngx_processes[i].signal = signo;
ngx_log_debug2(NGX_LOG_DEBUG_EVENT,
cycle->log, 0,
"signal " PID_T_FMT " %d",
ngx_processes[i].pid, signo);
}
}
delay = 125;
signo = 0;
}
for (i = 0; i < ngx_last_process; i++) {
if (ngx_processes[i].signal == 0) {
continue;
}
if (ccf->kqueue_signal != 1) {
sent = 1;
}
ngx_log_debug2(NGX_LOG_DEBUG_CORE, cycle->log, 0,
"kill (" PID_T_FMT ", %d)" ,
ngx_processes[i].pid,
ngx_processes[i].signal);
if (kill(ngx_processes[i].pid, ngx_processes[i].signal) == -1) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"kill(%d, %d) failed",
ngx_processes[i].pid, ngx_processes[i].signal);
continue;
}
if (ngx_processes[i].signal != ngx_signal_value(NGX_REOPEN_SIGNAL)) {
ngx_processes[i].exiting = 1;
}
}
}

View file

@ -13,7 +13,8 @@ static void ngx_master_exit(ngx_cycle_t *cycle, ngx_master_ctx_t *ctx);
static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data);
static void ngx_channel_handler(ngx_event_t *ev);
#if (NGX_THREADS)
static int ngx_worker_thread_cycle(void *data);
static void ngx_wakeup_worker_threads(ngx_cycle_t *cycle);
static void *ngx_worker_thread_cycle(void *data);
#endif
@ -40,6 +41,12 @@ ngx_uint_t ngx_noaccepting;
ngx_uint_t ngx_restart;
#if (NGX_THREADS)
volatile ngx_thread_t ngx_threads[NGX_MAX_THREADS];
ngx_int_t ngx_threads_n;
#endif
u_char master_process[] = "master process";
@ -524,9 +531,6 @@ static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
ngx_listening_t *ls;
ngx_core_conf_t *ccf;
ngx_connection_t *c;
#if (NGX_THREADS)
ngx_tid_t tid;
#endif
ngx_process = NGX_PROCESS_WORKER;
@ -620,23 +624,34 @@ static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
#if (NGX_THREADS)
if (ngx_init_threads(5, 128 * 1024 * 1024, cycle) == NGX_ERROR) {
if (ngx_time_mutex_init(cycle->log) == NGX_ERROR) {
/* fatal */
exit(2);
}
if (!(ngx_posted_events_cv = ngx_cond_init(cycle->log))) {
/* fatal */
exit(2);
}
for (i = 0; i < 2; i++) {
if (ngx_create_thread(&tid, ngx_worker_thread_cycle,
cycle, cycle->log) != 0)
if (ngx_threads_n) {
if (ngx_init_threads(ngx_threads_n,
ccf->thread_stack_size, cycle) == NGX_ERROR)
{
/* fatal */
exit(2);
}
for (n = 0; n < ngx_threads_n; n++) {
if (!(ngx_threads[n].cv = ngx_cond_init(cycle->log))) {
/* fatal */
exit(2);
}
if (ngx_create_thread((ngx_tid_t *) &ngx_threads[n].tid,
ngx_worker_thread_cycle,
(void *) &ngx_threads[n], cycle->log) != 0)
{
/* fatal */
exit(2);
}
}
}
#endif
@ -646,6 +661,14 @@ static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
&& ngx_event_timer_rbtree == &ngx_event_timer_sentinel)
{
ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exiting");
#if (NGX_THREADS)
ngx_terminate = 1;
ngx_wakeup_worker_threads(cycle);
#endif
/*
* we do not destroy cycle->pool here because a signal handler
* that uses cycle->log can be called at this point
@ -659,6 +682,11 @@ static void ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
if (ngx_terminate) {
ngx_log_error(NGX_LOG_INFO, cycle->log, 0, "exiting");
#if (NGX_THREADS)
ngx_wakeup_worker_threads(cycle);
#endif
/*
* we do not destroy cycle->pool here because a signal handler
* that uses cycle->log can be called at this point
@ -752,14 +780,53 @@ static void ngx_channel_handler(ngx_event_t *ev)
#if (NGX_THREADS)
int ngx_worker_thread_cycle(void *data)
static void ngx_wakeup_worker_threads(ngx_cycle_t *cycle)
{
ngx_cycle_t *cycle = data;
ngx_int_t i;
ngx_uint_t live;
for ( ;; ) {
live = 0;
for (i = 0; i < ngx_threads_n; i++) {
if (ngx_threads[i].state < NGX_THREAD_EXIT) {
ngx_cond_signal(ngx_threads[i].cv);
live = 1;
}
if (ngx_threads[i].state == NGX_THREAD_EXIT) {
ngx_thread_join(ngx_threads[i].tid, NULL);
ngx_threads[i].state = NGX_THREAD_DONE;
}
}
if (live == 0) {
ngx_log_debug0(NGX_LOG_DEBUG_CORE, cycle->log, 0,
"all worker threads are joined");
/* STUB */
ngx_mutex_destroy(ngx_event_timer_mutex);
ngx_mutex_destroy(ngx_posted_events_mutex);
return;
}
ngx_sched_yield();
}
}
static void* ngx_worker_thread_cycle(void *data)
{
ngx_thread_t *thr = data;
ngx_err_t err;
sigset_t set;
ngx_err_t err;
struct timeval tv;
thr->cv->tid = ngx_thread_self();
sigemptyset(&set);
sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL));
@ -767,32 +834,46 @@ int ngx_worker_thread_cycle(void *data)
err = ngx_thread_sigmask(SIG_BLOCK, &set, NULL);
if (err) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
ngx_log_error(NGX_LOG_ALERT, ngx_cycle->log, err,
ngx_thread_sigmask_n " failed");
return 1;
return (void *) 1;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, ngx_errno,
"thread %d started", ngx_thread_self());
"thread " TID_T_FMT " started", ngx_thread_self());
ngx_setproctitle("worker thread");
ngx_setthrtitle("worker thread");
if (ngx_mutex_lock(ngx_posted_events_mutex) == NGX_ERROR) {
return (void *) 1;
}
for ( ;; ) {
if (ngx_cond_wait(ngx_posted_events_cv, ngx_posted_events_mutex)
thr->state = NGX_THREAD_FREE;
if (ngx_cond_wait(thr->cv, ngx_posted_events_mutex) == NGX_ERROR) {
return (void *) 1;
}
if (ngx_terminate) {
thr->state = NGX_THREAD_EXIT;
ngx_mutex_unlock(ngx_posted_events_mutex);
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, ngx_errno,
"thread %d is done", ngx_thread_self());
return (void *) 0;
}
thr->state = NGX_THREAD_BUSY;
if (ngx_event_thread_process_posted((ngx_cycle_t *) ngx_cycle)
== NGX_ERROR)
{
return 1;
}
if (ngx_event_thread_process_posted(cycle) == NGX_ERROR) {
return 1;
return (void *) 1;
}
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, ngx_cycle->log, ngx_errno,
"thread %d done", ngx_thread_self());
return 0;
}
#endif

View file

@ -14,8 +14,8 @@
typedef struct {
int argc;
char *const *argv;
int argc;
char *const *argv;
} ngx_master_ctx_t;

View file

@ -1,26 +0,0 @@
#include <ngx_config.h>
#include <ngx_log.h>
#include <ngx_pthread.h>
int ngx_create_os_thread(ngx_os_tid_t *tid, void *stack,
ngx_thread_start_routine_t func, void *arg,
ngx_log_t log)
{
int err;
pthread_attr_t *attr;
attr = NULL;
err = pthread_create(tid, attr, func, arg);
if (err != 0) {
ngx_log_error(NGX_LOG_ERR, log, err, "pthread_create() failed");
return NGX_ERROR;
}
return NGX_OK;
}

View file

@ -1,14 +0,0 @@
#ifndef _NGX_OS_THREAD_H_INCLUDED_
#define _NGX_OS_THREAD_H_INCLUDED_
#include <pthread.h>
typedef pthread_t ngx_os_tid_t;
typedef int ngx_tid_t;
typedef void *(*)(void *) ngx_thread_start_routine_t
#endif /* _NGX_OS_THREAD_H_INCLUDED_ */

View file

@ -0,0 +1,268 @@
/*
* Copyright (C) 2002-2004 Igor Sysoev, http://sysoev.ru/en/
*/
#include <ngx_config.h>
#include <ngx_core.h>
static ngx_uint_t nthreads;
static ngx_uint_t max_threads;
static pthread_attr_t thr_attr;
int ngx_create_thread(ngx_tid_t *tid, void* (*func)(void *arg), void *arg,
ngx_log_t *log)
{
int err;
if (nthreads >= max_threads) {
ngx_log_error(NGX_LOG_CRIT, log, 0,
"no more than %d threads can be created", max_threads);
return NGX_ERROR;
}
err = pthread_create(tid, &thr_attr, func, arg);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, log, err, "pthread_create() failed");
return err;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, log, 0,
"thread is created: " TID_T_FMT, *tid);
nthreads++;
return err;
}
ngx_int_t ngx_init_threads(int n, size_t size, ngx_cycle_t *cycle)
{
int err;
max_threads = n;
err = pthread_attr_init(&thr_attr);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
"pthread_attr_init() failed");
return NGX_ERROR;
}
err = pthread_attr_setstacksize(&thr_attr, size);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, cycle->log, err,
"pthread_attr_setstacksize() failed");
return NGX_ERROR;
}
ngx_threaded = 1;
return NGX_OK;
}
ngx_mutex_t *ngx_mutex_init(ngx_log_t *log, uint flags)
{
int err;
ngx_mutex_t *m;
if (!(m = ngx_alloc(sizeof(ngx_mutex_t), log))) {
return NULL;
}
m->log = log;
err = pthread_mutex_init(&m->mutex, NULL);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, m->log, err,
"pthread_mutex_init() failed");
return NULL;
}
return m;
}
void ngx_mutex_destroy(ngx_mutex_t *m)
{
int err;
err = pthread_mutex_destroy(&m->mutex);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, m->log, err,
"pthread_mutex_destroy(" PTR_FMT ") failed", m);
}
ngx_free(m);
}
ngx_int_t ngx_mutex_lock(ngx_mutex_t *m)
{
int err;
if (!ngx_threaded) {
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0, "lock mutex " PTR_FMT, m);
err = pthread_mutex_lock(&m->mutex);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, m->log, err,
"pthread_mutex_lock(" PTR_FMT ") failed", m);
return NGX_ERROR;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0,
"mutex " PTR_FMT " is locked", m);
return NGX_OK;
}
ngx_int_t ngx_mutex_trylock(ngx_mutex_t *m)
{
int err;
if (!ngx_threaded) {
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0, "try lock mutex " PTR_FMT, m);
err = pthread_mutex_trylock(&m->mutex);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, m->log, err,
"pthread_mutex_trylock(" PTR_FMT ") failed", m);
return NGX_ERROR;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0,
"mutex " PTR_FMT " is locked", m);
return NGX_OK;
}
ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m)
{
int err;
if (!ngx_threaded) {
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0, "unlock mutex " PTR_FMT, m);
err = pthread_mutex_unlock(&m->mutex);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, m->log, err,
"pthread_mutex_unlock(" PTR_FMT ") failed", m);
return NGX_ERROR;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0,
"mutex " PTR_FMT " is unlocked", m);
return NGX_OK;
}
ngx_cond_t *ngx_cond_init(ngx_log_t *log)
{
int err;
ngx_cond_t *cv;
if (!(cv = ngx_alloc(sizeof(ngx_cond_t), log))) {
return NULL;
}
cv->log = log;
err = pthread_cond_init(&cv->cond, NULL);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, cv->log, err,
"pthread_cond_init() failed");
return NULL;
}
return cv;
}
void ngx_cond_destroy(ngx_cond_t *cv)
{
int err;
err = pthread_cond_destroy(&cv->cond);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, cv->log, err,
"pthread_cond_destroy(" PTR_FMT ") failed", cv);
}
ngx_free(cv);
}
ngx_int_t ngx_cond_wait(ngx_cond_t *cv, ngx_mutex_t *m)
{
int err;
ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
"cv " PTR_FMT " wait", cv);
err = pthread_cond_wait(&cv->cond, &m->mutex);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, cv->log, err,
"pthread_cond_wait(" PTR_FMT ") failed", cv);
return NGX_ERROR;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
"cv " PTR_FMT " is waked up", cv);
ngx_log_debug1(NGX_LOG_DEBUG_CORE, m->log, 0,
"mutex " PTR_FMT " is locked", m);
return NGX_OK;
}
ngx_int_t ngx_cond_signal(ngx_cond_t *cv)
{
int err;
ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
"cv " PTR_FMT " to signal", cv);
err = pthread_cond_signal(&cv->cond);
if (err != 0) {
ngx_log_error(NGX_LOG_ALERT, cv->log, err,
"pthread_cond_signal(" PTR_FMT ") failed", cv);
return NGX_ERROR;
}
ngx_log_debug1(NGX_LOG_DEBUG_CORE, cv->log, 0,
"cv " PTR_FMT " is signaled", cv);
return NGX_OK;
}

View file

@ -7,111 +7,84 @@
#if (NGX_THREADS)
#define ngx_thread_volatile volatile
#define NGX_MAX_THREADS 128
#if (NGX_USE_RFORK)
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sched.h>
typedef pid_t ngx_tid_t;
#undef ngx_log_pid
#define ngx_log_pid ngx_thread_self()
#define ngx_log_tid 0
#define TID_T_FMT PID_T_FMT
#define NGX_MUTEX_LIGHT 1
#define NGX_MUTEX_CV 2
#define NGX_MUTEX_LOCK_BUSY 0x80000000
typedef volatile struct {
ngx_atomic_t lock;
ngx_log_t *log;
int semid;
} ngx_mutex_t;
typedef struct {
int semid;
ngx_log_t *log;
} ngx_cond_t;
#define ngx_thread_sigmask(how, set, oset) \
(sigprocmask(how, set, oset) == -1) ? ngx_errno : 0
#define ngx_thread_sigmask_n "sigprocmask()"
extern char *ngx_freebsd_kern_usrstack;
extern size_t ngx_thread_stack_size;
static inline int ngx_gettid()
{
char *sp;
if (ngx_thread_stack_size == 0) {
return 0;
}
#if ( __i386__ )
__asm__ volatile ("mov %%esp, %0" : "=q" (sp));
#elif ( __amd64__ )
__asm__ volatile ("mov %%rsp, %0" : "=q" (sp));
#else
#error "rfork()ed threads are not supported on this platform"
#endif
return (ngx_freebsd_kern_usrstack - sp) / ngx_thread_stack_size;
}
#define ngx_thread_main() (ngx_gettid() == 0)
#include <ngx_freebsd_rfork_thread.h>
#else /* use pthreads */
#include <pthread.h>
#include <pthread_np.h>
typedef pthread_t ngx_tid_t;
#define ngx_gettid() ((ngx_int_t) pthread_getspecific(0))
#define ngx_log_tid ngx_thread_self()
#define ngx_thread_self() pthread_self()
#define ngx_thread_main() pthread_main_np()
#define ngx_log_tid (int) ngx_thread_self()
#define TID_T_FMT PTR_FMT
#define NGX_MUTEX_LIGHT 0
typedef struct {
pthread_mutex_t mutex;
ngx_log_t *log;
} ngx_mutex_t;
typedef struct {
pthread_cond_t cond;
ngx_tid_t tid;
ngx_log_t *log;
} ngx_cond_t;
#define ngx_thread_sigmask pthread_sigmask
#define ngx_thread_sigmask_n "pthread_sigmask()"
#define ngx_thread_join(t, p) pthread_join(t, p)
#define ngx_setthrtitle(n)
ngx_int_t ngx_mutex_trylock(ngx_mutex_t *m);
ngx_int_t ngx_mutex_lock(ngx_mutex_t *m);
ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m);
#endif
#define ngx_thread_volatile volatile
typedef struct {
ngx_tid_t tid;
ngx_cond_t *cv;
ngx_uint_t state;
} ngx_thread_t;
#define NGX_THREAD_FREE 1
#define NGX_THREAD_BUSY 2
#define NGX_THREAD_EXIT 3
#define NGX_THREAD_DONE 4
extern ngx_int_t ngx_threads_n;
extern volatile ngx_thread_t ngx_threads[NGX_MAX_THREADS];
ngx_int_t ngx_init_threads(int n, size_t size, ngx_cycle_t *cycle);
int ngx_create_thread(ngx_tid_t *tid, int (*func)(void *arg), void *arg,
int ngx_create_thread(ngx_tid_t *tid, void* (*func)(void *arg), void *arg,
ngx_log_t *log);
ngx_tid_t ngx_thread_self();
ngx_mutex_t *ngx_mutex_init(ngx_log_t *log, uint flags);
void ngx_mutex_done(ngx_mutex_t *m);
#define ngx_mutex_trylock(m) ngx_mutex_dolock(m, 1)
#define ngx_mutex_lock(m) ngx_mutex_dolock(m, 0)
ngx_int_t ngx_mutex_dolock(ngx_mutex_t *m, ngx_int_t try);
ngx_int_t ngx_mutex_unlock(ngx_mutex_t *m);
void ngx_mutex_destroy(ngx_mutex_t *m);
ngx_cond_t *ngx_cond_init(ngx_log_t *log);
void ngx_cond_done(ngx_cond_t *cv);
void ngx_cond_destroy(ngx_cond_t *cv);
ngx_int_t ngx_cond_wait(ngx_cond_t *cv, ngx_mutex_t *m);
ngx_int_t ngx_cond_signal(ngx_cond_t *cv);