Nginx 惊群效应的处理和事件调度循环
前言
工作的过程中常常听到惊群效应这一高端的名词,故怀着好奇心去源码的世界一探究竟,探究问题的所在和 nginx 的解决方案。如果下面的分析有错,希望大佬们多多指出。
Nginx进程模型
Nginx 默认采用多进程工作方式,Nginx启动后,会运行一个master进程和多个worker进程。其中master充当整个进程组与用户的交互接口,同时对进程进行监护,管理worker进程来实现重启服务、平滑升级、更换日志文件、配置文件实时生效等功能。worker用来处理基本的网络事件,worker之间是平等的,他们共同竞争来处理来自客户端的请求。
解决惊群问题
epoll 惊群
epoll 有两种工作方式:LT(水平触发) 和 ET(边缘触发)。LT 即只要有事件就通知,而 ET 则只有状态变化时才会通知。
LT 状态下,只要有通知,所有监听这个 socket 的线程都会被唤醒。
ET 状态下,内核只会通知一次(一个线程),因此无论是 accept()
、read()
还是 write()
都要循环操作到底层返回 EAGAIN
为止。
但是 ET 也会有竞争问题:线程A的 epoll_wait()
返回后,线程 A 不断的调用 accept()
处理连接请求,当内核的 accept queue
队列中的请求恰好处理完时候,内核会重新将该 socket 置为不可读状态,以便可以重新被触发;此时如果新来了一个连接,那么另外一个线程 B 可能被唤醒,然后执行accept()
操作,不过此时之前的线程 A 还需要重新再执行一次 accept()
以确认 accept queue
已经被处理完了,此时如果线程A成功 accept
的话,线程 B 就被惊醒了(线程 B 没有 accept
成功)。
历史上还存在过 accept 惊群,但现在的内核已经解决了这个问题,内核只会唤醒一个进程。
Ngnix 的解决方法
Ngnix 目前有几种方法解决惊群问题。
accept_mutex 锁
如果开启了accept_mutex
锁,每个 worker 都会先去抢自旋锁,只有抢占成功了,才把 socket 加入到 epoll 中,accept 请求,然后释放锁。accept_mutex
锁也有负载均衡的作用,接受太多请求的 worker 会根据 ngx_accept_disabled
自动放弃锁争抢的机会。
accept_mutex
效率较低,在锁释放之前都无法给到别的进行进行请求的处理,其他进程只能等待 epoll_wait() 超时唤醒。
SO_REUSEPORT 选项
SO_REUSEPORT
在 Linux Kernel 3.9+ 加入支持,Ngnix 在 1.9.1 中加入了这个选项,每个 worker 都有自己的 socket,这些 socket 都 bind
同一个端口。当新请求到来时,内核根据四元组信息进行负载均衡,相对更加高效。
EPOLLEXCLUSIVE 标识
EPOLLEXCLUSIVE
是 Linux Kernel 4.5+ 新添加的一个 epoll 的标识,Ngnix 在 1.11.3 之后添加了 NGX_EXCLUSIVE_EVENT
。
EPOLLEXCLUSIVE
标识会保证一个事件发生时候只有一个线程会被唤醒,以避免多侦听下的“惊群”问题。不过任一时候只能有一个工作线程调用 accept,在负载较低的时候可能出现某些进程比较繁忙的情况。
针对避免惊群现象的总结:Nginx 在初始化过程中会调用 ngx_event_module_init
, ngx_event_process_init
对 IO 事件处理模块和 worker 进程的事件处理循环进行初始化,以常用的 Epoll 为例,如果使用的是 ReusePort 或者 EpollExclusive 来避免惊群现象,则事先就将对事件的监听注册到系统中。使用 accept_mutex,则在后续的事件循环中通过锁的争抢,仅注册成功获得锁的 worker 进行事件处理来避免惊群现象。上面所述的三种方法都有有优势的方面,大家可以根据实际情况进行选择。
调度原理
// src/core/nginx.c
int ngx_cdecl
main(int argc, char *const *argv)
{
(...) // 进行所有初始化步骤,包括读取配置文件,初始化共享数据结构
if (ngx_process == NGX_PROCESS_SINGLE) {
ngx_single_process_cycle(cycle);
} else {
ngx_master_process_cycle(cycle); // 启动 Master,进入 Master Cycle
}
return 0;
}
// src/os/unix/ngx_process_cycle.c
void
ngx_master_process_cycle(ngx_cycle_t *cycle)
{
char *title;
u_char *p;
size_t size;
ngx_int_t i;
ngx_uint_t sigio;
sigset_t set;
struct itimerval itv;
ngx_uint_t live;
ngx_msec_t delay;
ngx_core_conf_t *ccf;
sigemptyset(&set);
sigaddset(&set, SIGCHLD);
sigaddset(&set, SIGALRM);
sigaddset(&set, SIGIO);
sigaddset(&set, SIGINT);
sigaddset(&set, ngx_signal_value(NGX_RECONFIGURE_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_REOPEN_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_NOACCEPT_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_TERMINATE_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_SHUTDOWN_SIGNAL));
sigaddset(&set, ngx_signal_value(NGX_CHANGEBIN_SIGNAL));
if (sigprocmask(SIG_BLOCK, &set, NULL) == -1) { // 将信号集合 set 加入到进程的信号阻塞集合之中去,屏蔽信号
ngx_log_error(NGX_LOG_ALERT, cycle->log, ngx_errno,
"sigprocmask() failed");
}
sigemptyset(&set); // 清空信号集
(...)
ccf = (ngx_core_conf_t *) ngx_get_conf(cycle->conf_ctx, ngx_core_module); // 获取配置
ngx_start_worker_processes(cycle, ccf->worker_processes,
NGX_PROCESS_RESPAWN); // 根据配置中的 worker process 数量启动 worker
(...)
for ( ;; ) {
(...)
sigsuspend(&set); // 挂起 Master
ngx_time_update(); // 唤醒后更新时间
// 下面是根据信号处理器设置的全局变量进行对应操作
// 包括但不限于:
// 关闭worker
// 重载配置或者更新可执行 bin
// Terminate, Quit, Reopen, Restart 事件
(...)
}
}
static void
ngx_start_worker_processes(ngx_cycle_t *cycle, ngx_int_t n, ngx_int_t type)
{
ngx_int_t i;
ngx_log_error(NGX_LOG_NOTICE, cycle->log, 0, "start worker processes");
for (i = 0; i < n; i++) {
// 根据配置的 worker 数量启动
// 启动后 worker 进入 ngx_worker_process_cycle
ngx_spawn_process(cycle, ngx_worker_process_cycle,
(void *) (intptr_t) i, "worker process", type);
ngx_pass_open_channel(cycle);
}
}
static void
ngx_worker_process_cycle(ngx_cycle_t *cycle, void *data)
{
ngx_int_t worker = (intptr_t) data;
ngx_process = NGX_PROCESS_WORKER; // 设置进程类型标识
ngx_worker = worker;
ngx_worker_process_init(cycle, worker); // worker 进程初始化
ngx_setproctitle("worker process");
for ( ;; ) {
(...) // 根据信号处理器处理 Exit 事件
ngx_process_events_and_timers(cycle); // 进入处理请求
(...) // 根据信号处理器处理 Terminate, Quit, Reopen 事件
}
首先分析使用 accept_mutex 所需的先决步骤,需要进行锁的获取,获取成功的 worker 才会被注册到 IO 事件系统中
// src/event/ngx_event.h
#define ngx_process_events ngx_event_actions.process_events
// src/event/ngx_event.c
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
// 获取定时器
if (ngx_timer_resolution) {
timer = NGX_TIMER_INFINITE;
flags = 0;
} else {
timer = ngx_event_find_timer();
flags = NGX_UPDATE_TIME;
#if (NGX_WIN32)
/* handle signals from master in case of network inactivity */
if (timer == NGX_TIMER_INFINITE || timer > 500) {
timer = 500;
}
#endif
}
if (ngx_use_accept_mutex) { // 使用 accept_mutex 的情况
if (ngx_accept_disabled > 0) {
ngx_accept_disabled--;
} else {
/*
尝试获取 accept mutex,只有成功获取锁的进程,才会将 listen 套接字注册到 IO 事件系统中。
因此,这就保证了只有一个进程拥有监听套接口,故所有进程阻塞在 epoll_wait 时,才不会惊群现象。
*/
if (ngx_trylock_accept_mutex(cycle) == NGX_ERROR) {
return;
}
/*
如果进程获得了锁,将添加一个 NGX_POST_EVENTS 标志。
这个标志的作用是将所有产生的事件放入一个队列中,等释放锁后,在慢慢来处理事件。
因为,处理请求可能会很耗时,如果不先施放锁再处理的话,该进程就长时间霸占了锁,
导致其他进程无法获取锁,这样 accept 的效率就低了。
*/
if (ngx_accept_mutex_held) {
flags |= NGX_POST_EVENTS;
} else {
// 没有获得锁得进程,设置 IO 等待超时时间,再去争抢锁。
if (timer == NGX_TIMER_INFINITE
|| timer > ngx_accept_mutex_delay)
{
timer = ngx_accept_mutex_delay;
}
}
}
}
(...)
}
// src/event/ngx_event_accept.c
ngx_int_t
ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
if (ngx_shmtx_trylock(&ngx_accept_mutex)) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"accept mutex locked");
if (ngx_accept_mutex_held && ngx_accept_events == 0) {
return NGX_OK;
}
// 注册 accept 事件
if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
ngx_shmtx_unlock(&ngx_accept_mutex);
return NGX_ERROR;
}
ngx_accept_events = 0;
ngx_accept_mutex_held = 1;
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"accept mutex lock failed: %ui", ngx_accept_mutex_held);
// 没有获取到锁,删除先前被注册的事件
if (ngx_accept_mutex_held) {
if (ngx_disable_accept_events(cycle, 0) == NGX_ERROR) {
return NGX_ERROR;
}
ngx_accept_mutex_held = 0;
}
return NGX_OK;
}
ngx_int_t
ngx_enable_accept_events(ngx_cycle_t *cycle)
{
ngx_uint_t i;
ngx_listening_t *ls;
ngx_connection_t *c;
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
c = ls[i].connection;
if (c == NULL || c->read->active) {
continue;
}
// 注册事件
if (ngx_add_event(c->read, NGX_READ_EVENT, 0) == NGX_ERROR) {
return NGX_ERROR;
}
}
return NGX_OK;
}
static ngx_int_t
ngx_disable_accept_events(ngx_cycle_t *cycle, ngx_uint_t all)
{
ngx_uint_t i;
ngx_listening_t *ls;
ngx_connection_t *c;
ls = cycle->listening.elts;
for (i = 0; i < cycle->listening.nelts; i++) {
c = ls[i].connection;
if (c == NULL || !c->read->active) {
continue;
}
#if (NGX_HAVE_REUSEPORT)
/*
* do not disable accept on worker's own sockets
* when disabling accept events due to accept mutex
*/
if (ls[i].reuseport && !all) {
continue;
}
#endif
// 删除已注册事件
if (ngx_del_event(c->read, NGX_READ_EVENT, NGX_DISABLE_EVENT)
== NGX_ERROR)
{
return NGX_ERROR;
}
}
return NGX_OK;
}
// src/core/ngx_shmtx.c
ngx_uint_t
ngx_shmtx_trylock(ngx_shmtx_t *mtx)
{
// 通过 CAS 获取锁
return (*mtx->lock == 0 && ngx_atomic_cmp_set(mtx->lock, 0, ngx_pid));
}
接下来就是进行事件的等待和处理
// src/event/ngx_event.h
#define ngx_process_events ngx_event_actions.process_events
// src/event/ngx_event.c
void
ngx_process_events_and_timers(ngx_cycle_t *cycle)
{
ngx_uint_t flags;
ngx_msec_t timer, delta;
// 获取定时器
if (ngx_timer_resolution) {
timer = NGX_TIMER_INFINITE;
flags = 0;
} else {
timer = ngx_event_find_timer();
flags = NGX_UPDATE_TIME;
#if (NGX_WIN32)
/* handle signals from master in case of network inactivity */
if (timer == NGX_TIMER_INFINITE || timer > 500) {
timer = 500;
}
#endif
}
(...)
// 每次 eventloop 都会清空所有的事件队列
// ngx_posted_next_events, ngx_posted_accept_events, ngx_posted_events
// 如果 ngx_posted_next_events 队列中仍有未处理的事件,先行处理
if (!ngx_queue_empty(&ngx_posted_next_events)) {
ngx_event_move_posted_next(cycle);
timer = 0;
}
delta = ngx_current_msec;
(void) ngx_process_events(cycle, timer, flags); // 执行对应 IO 时间模型 process_events 方法
delta = ngx_current_msec - delta;
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"timer delta: %M", delta);
/*
ngx_posted_accept_events是一个事件队列,暂存 epoll 从监听套接口 wait 到的 accept 事件。
前文提到的 NGX_POST_EVENTS 标志被使用后,会将所有的 accept 事件暂存到这个队列
*/
ngx_event_process_posted(cycle, &ngx_posted_accept_events);
// 处理完事件后,释放锁
if (ngx_accept_mutex_held) {
ngx_shmtx_unlock(&ngx_accept_mutex);
}
// 处理超时的任务
ngx_event_expire_timers();
// 读写事件将会被添加到 ngx_posted_events 队列中
ngx_event_process_posted(cycle, &ngx_posted_events);
}
// src/event/ngx_event_posted.c
// 进行事件获取并调用事件对应 handle
void
ngx_event_process_posted(ngx_cycle_t *cycle, ngx_queue_t *posted)
{
ngx_queue_t *q;
ngx_event_t *ev;
while (!ngx_queue_empty(posted)) {
q = ngx_queue_head(posted);
ev = ngx_queue_data(q, ngx_event_t, queue);
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"posted event %p", ev);
ngx_delete_posted_event(ev);
ev->handler(ev);
}
}
// src/event/ngx_event_timer.c
// 处理超时任务
void
ngx_event_expire_timers(void)
{
sentinel = ngx_event_timer_rbtree.sentinel;
for ( ;; ) {
//1) 获取最近将要过期的红黑树节点
node = ngx_rbtree_min(root, sentinel);
if ((ngx_msec_int_t) (node->key - ngx_current_msec) > 0){
//2) 未过期,直接return返回
}
//3) 获得过期节点的ngx_event_t结构
ev = (ngx_event_t *) ((char *) node - offsetof(ngx_event_t, timer));
//4) 从红黑树中移除该过期事件
ngx_rbtree_delete(&ngx_event_timer_rbtree, &ev->timer);
//5) 调用定时器所绑定的handler回调函数
ev->timer_set = 0; //timer_set标志为置为0
ev->timedout = 1; // timeout标志为置为1,表示定时器已经超时
ev->handler(ev);
}
}
小结
Nginx 的高效是许多人有亲身体验的,不难看出,Nginx 内部为了更高效地处理请求,用到了很多复杂的数据结构和架构设计,也仰仗于 Linux 内核的不断发展,也使得 Nginx 效率不断提高。最后想吐槽一下,大佬们真的不爱写注释,作为小透明的我们表示很难受。