聊聊 EPoll - ET vs LT
背景
这里不对Epoll和I/O multiplexing 概念进行介绍。
先对比下ET vs LT在开源里的应用
- JDK Selector 默认LT
- EPollArrayWrapper.java
- EPollSelectotImpl.java
void initInterrupt(int fd0, int fd1) {
outgoingInterruptFD = fd1;
incomingInterruptFD = fd0;
epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
}
EPollSelectorImpl(SelectorProvider sp) throws IOException {
super(sp);
long pipeFds = IOUtil.makePipe(false);
fd0 = (int) (pipeFds >>> 32);
fd1 = (int) pipeFds;
try {
pollWrapper = new EPollArrayWrapper();
pollWrapper.initInterrupt(fd0, fd1);
fdToKey = new HashMap<>();
} catch (Throwable t) {
try {
FileDispatcherImpl.closeIntFD(fd0);
} catch (IOException ioe0) {
t.addSuppressed(ioe0);
}
try {
FileDispatcherImpl.closeIntFD(fd1);
} catch (IOException ioe1) {
t.addSuppressed(ioe1);
}
throw t;
}
}
- Redis 默认LT ae_poll.c
- 主打一个实现简洁,有利于维护
- Redis 操作大都是小块数据,LT模式下,内核 监听FD status变化反复通知的问题较小。比如小块数据,非常容易一次read结束,减少了反复wakeup的问题
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
- Nginx Accept 默认LT,Read/Write 使用ET
- ngx_epoll_module.c ngx_event_accept.c
- Nginx webserver应用场景,针对高并发的http请求,数据传输相对redis更大,ET模式只通知一次效率更好
ngx_epoll_add_connection(ngx_connection_t *c)
{
struct epoll_event ee;
ee.events = EPOLLIN|EPOLLOUT|EPOLLET;
ee.data.ptr = (void *) ((uintptr_t) c | c->read->instance);
ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
"epoll add connection: fd:%d ev:%08XD", c->fd, ee.events);
if (epoll_ctl(ep, EPOLL_CTL_ADD, c->fd, &ee) == -1) {
ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
"epoll_ctl(EPOLL_CTL_ADD, %d) failed", c->fd);
return NGX_ERROR;
}
c->read->active = 1;
c->write->active = 1;
return NGX_OK;
}
接下来重点介绍下本文的重点关注: Epoll scaling out带来的问题
分为两类:
- accept() scaling out问题
- read() scaling out问题
Scaling out accept
场景:有时需要处理大量非常短暂的 TCP 连接,例如高吞吐量的HTTP服务器。由于入站连接的速率很高,希望将 accept()连接的工作分布到多个CPU core 上
Level triggered 模式
多线程epoll_wait,试想如下应用:
- 内核:接收到一个新的connection请求
- 内核:通知两个等待中的线程 A 和 B。由于LT通知带来的“惊群”行为,内核必须wake up两个线程
- 线程 A:完成 epoll_wait()
- 线程 B:完成 epoll_wait()
- 线程 A:执行 accept(),成功
- 线程 B:执行 accept(),返回 EAGAIN 错误
wake up 线程B浪费了系统资源,当有更多的threads会更加浪费
Edge triggered 模式
多线程模式下是不是ET更好呢?
- 内核:接收到第一个连接。有两个线程 A 和 B 在等待。由于ET的行为,只通知其中一个 - 假设是线程 A
- 线程 A:完成 epoll_wait()
- 线程 A:执行 accept(),成功。到这里一切正常
- 内核:accept queue为空,事件触发的套接字从“可读”移到“不可读”,因此内核必须重新激活FD
- 内核:接收到第二个连接
- 内核:当前只有一个线程在 epoll_wait() 上等待,内核wake up线程 B
- 线程 A:由于它不知道内核最初是否接收了一个或多个连接,必须执行 accept()。它希望得到 EAGAIN,但却得到另一个套接字
- 线程 B:执行 accept(),期望得到连接,却收到 EAGAIN
- 线程 A:必须再次执行 accept(),得到 EAGAIN
依然会存在资源浪费的问题。 除此之外,还存在线程饥饿的问题
- 内核:接收到两个连接。有两个线程 A 和 B 在等待。由于“边缘触发”的行为,只通知其中一个 - 假设是线程 A
- 线程 A:完成 epoll_wait()
- 线程 A:执行 accept(),成功
- 内核:接收到第三个连接。该套接字处于“可读”状态,仍然是“可读”状态。由于我们处于ET触发模式,不会触发事件
- 线程 A:必须执行 accept(),希望得到 EAGAIN,但得到另一个套接字
- 内核:接收到第四个连接
- 线程 A:必须执行 accept(),希望得到 EAGAIN,但得到另一个套接字
Epoll的复杂性提升了使用者的难度。不熟悉最好不要乱碰
正确的使用方式:
Level triggered
- EPOLLEXCLUSIVE 使用
- 保证只有一个线程被wake up,防止在多core下的惊群效应
- Nginx早起的多进程模式,使用的是Mutex
ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
if (ngx_shmtx_trylock(&ngx_accept_mutex)) {
ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"accept mutex locked");
if (ngx_accept_mutex_held
&& ngx_accept_events == 0
&& !(ngx_event_flags & NGX_USE_RTSIG_EVENT))
{
return NGX_OK;
}
if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
ngx_shmtx_unlock(&ngx_accept_mutex);
return NGX_ERROR;
}
ngx_accept_events = 0;
ngx_accept_mutex_held = 1;
return NGX_OK;
}
ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
"accept mutex lock failed: %ui", ngx_accept_mutex_held);
if (ngx_accept_mutex_held) {
if (ngx_disable_accept_events(cycle) == NGX_ERROR) {
return NGX_ERROR;
}
ngx_accept_mutex_held = 0;
}
return NGX_OK;
}
需要注意的是,nginx不会让子进程一直偏好被分配。之前获得锁的,操作资源的使用上线,会放弃锁,而进行负载均衡 ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;
Edge triggered
-
EPOLLONESHOT 使用
-
额外工作
- 每个事件后需要额外的 epoll_ctl() 系统调用
- 内核:接收到两个连接。有两个线程 A 和 B 在等待。由于“边缘触发”的行为,只通知其中一个 - 假设是线程 A
- 线程 A:完成 epoll_wait()
- 线程 A:执行 accept(),成功
- 线程 A:执行 epoll_ctl(EPOLL_CTL_MOD),这将重置 EPOLLONESHOT 并重新激活套接字
Scaling out read
背景
想象一下,当你有大量的 HTTP 客户端连接并且希望尽快处理它们时。每个连接可能需要一些不可预测的处理,使用“合并队列”排队模型 - 拥有一个 epoll 集并使用多个线程拉取活动套接字并执行工作
level triggered
除了惊群效应之外,还存在Race condition
- 内核:接收到 2047 bytes的数据
- 内核:两个线程正在 epoll 上等待,由于 EPOLLEXCLUSIVE 行为,内核wake up了线程 A
- 线程 A:完成 epoll_wait()
- 内核:接收到 2 bytes的数据
- 内核:只有一个线程在 epoll 上等待,内核wake up线程 B
- 线程 A:执行 read(2048) 并读取了完整的 2048 字节缓冲区
- 线程 B:执行 read(2048) 并读取了剩余的 1 字节数据
edge triggered
依然存在Race condition
- 内核:接收到 2048 字节的数据。
- 内核:有两个线程 A 和 B 在等待数据。由于ET触发的行为,只有其中一个被通知
- 线程 A:完成 epoll_wait()
- 线程 A:执行 read(2048) 并读取了完整的 2048 字节缓冲区
- 内核:缓冲区为空,因此内核重新激活文件描述符
- 内核:接收到 1 字节的数据
- 内核:当前只有一个线程在 epoll_wait() 上等待,唤醒线程 B
- 线程 B:完成 epoll_wait()
- 线程 B:执行 read(2048) 并获取了 1 字节的数据
- 线程 A:重试 read(2048),返回空,并得到 EAGAIN
正确的方式:
- 使用EPOLLONESHOT
- 额外工作,重新激活FD
- jvm 代码示例EPollPort.java
@Override
void startPoll(int fd, int events) {
// update events (or add to epoll on first usage)
int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
if (err == ENOENT)
err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
if (err != 0)
throw new AssertionError(); // should not happen
}
- redis 单线程模型,代码简洁很多,不需要如此处理
- Nginx 代码就复杂很多
ngx_handle_read_event(ngx_event_t *rev, ngx_uint_t flags)
{
if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {
/* kqueue, epoll */
if (!rev->active && !rev->ready) {
if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT)
== NGX_ERROR)
{
return NGX_ERROR;
}
}
return NGX_OK;
} else if (ngx_event_flags & NGX_USE_LEVEL_EVENT) {
/* select, poll, /dev/poll */
if (!rev->active && !rev->ready) {
if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT)
== NGX_ERROR)
{
return NGX_ERROR;
}
return NGX_OK;
}
if (rev->active && (rev->ready || (flags & NGX_CLOSE_EVENT))) {
if (ngx_del_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT | flags)
== NGX_ERROR)
{
return NGX_ERROR;
}
return NGX_OK;
}
} else if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) {
/* event ports */
if (!rev->active && !rev->ready) {
if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
return NGX_ERROR;
}
return NGX_OK;
}
if (rev->oneshot && !rev->ready) {
if (ngx_del_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
return NGX_ERROR;
}
return NGX_OK;
}
}
/* aio, iocp, rtsig */
return NGX_OK;
}
见👆的oneshot位置
总结
正确使用 epoll() 是困难的。理解额外的标志 EPOLLONESHOT 和 EPOLLEXCLUSIVE,谨慎使用
当然,最好的方式还是使用成熟的开源软件