聊聊 EPoll - ET vs LT

背景

这里不对Epoll和I/O multiplexing 概念进行介绍。

先对比下ET vs LT在开源里的应用

  • JDK Selector 默认LT
  • EPollArrayWrapper.java
  • EPollSelectotImpl.java
     void initInterrupt(int fd0, int fd1) {
        outgoingInterruptFD = fd1;
        incomingInterruptFD = fd0;
        epollCtl(epfd, EPOLL_CTL_ADD, fd0, EPOLLIN);
    }
    EPollSelectorImpl(SelectorProvider sp) throws IOException {
        super(sp);
        long pipeFds = IOUtil.makePipe(false);
        fd0 = (int) (pipeFds >>> 32);
        fd1 = (int) pipeFds;
        try {
            pollWrapper = new EPollArrayWrapper();
            pollWrapper.initInterrupt(fd0, fd1);
            fdToKey = new HashMap<>();
        } catch (Throwable t) {
            try {
                FileDispatcherImpl.closeIntFD(fd0);
            } catch (IOException ioe0) {
                t.addSuppressed(ioe0);
            }
            try {
                FileDispatcherImpl.closeIntFD(fd1);
            } catch (IOException ioe1) {
                t.addSuppressed(ioe1);
            }
            throw t;
        }
    }
  • Redis 默认LT ae_poll.c
  • 主打一个实现简洁,有利于维护
  • Redis 操作大都是小块数据,LT模式下,内核 监听FD status变化反复通知的问题较小。比如小块数据,非常容易一次read结束,减少了反复wakeup的问题
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}
  • Nginx Accept 默认LT,Read/Write 使用ET
  • ngx_epoll_module.c ngx_event_accept.c
  • Nginx webserver应用场景,针对高并发的http请求,数据传输相对redis更大,ET模式只通知一次效率更好
ngx_epoll_add_connection(ngx_connection_t *c)
{
    struct epoll_event  ee;

    ee.events = EPOLLIN|EPOLLOUT|EPOLLET;
    ee.data.ptr = (void *) ((uintptr_t) c | c->read->instance);

    ngx_log_debug2(NGX_LOG_DEBUG_EVENT, c->log, 0,
                   "epoll add connection: fd:%d ev:%08XD", c->fd, ee.events);

    if (epoll_ctl(ep, EPOLL_CTL_ADD, c->fd, &ee) == -1) {
        ngx_log_error(NGX_LOG_ALERT, c->log, ngx_errno,
                      "epoll_ctl(EPOLL_CTL_ADD, %d) failed", c->fd);
        return NGX_ERROR;
    }

    c->read->active = 1;
    c->write->active = 1;

    return NGX_OK;
}

接下来重点介绍下本文的重点关注: Epoll scaling out带来的问题

分为两类:

  • accept() scaling out问题
  • read() scaling out问题

Scaling out accept

场景:有时需要处理大量非常短暂的 TCP 连接,例如高吞吐量的HTTP服务器。由于入站连接的速率很高,希望将 accept()连接的工作分布到多个CPU core 上

Level triggered 模式

多线程epoll_wait,试想如下应用:

  1. 内核:接收到一个新的connection请求
  2. 内核:通知两个等待中的线程 A 和 B。由于LT通知带来的“惊群”行为,内核必须wake up两个线程
  3. 线程 A:完成 epoll_wait()
  4. 线程 B:完成 epoll_wait()
  5. 线程 A:执行 accept(),成功
  6. 线程 B:执行 accept(),返回 EAGAIN 错误

wake up 线程B浪费了系统资源,当有更多的threads会更加浪费

Edge triggered 模式

多线程模式下是不是ET更好呢?

  1. 内核:接收到第一个连接。有两个线程 A 和 B 在等待。由于ET的行为,只通知其中一个 - 假设是线程 A
  2. 线程 A:完成 epoll_wait()
  3. 线程 A:执行 accept(),成功。到这里一切正常
  4. 内核:accept queue为空,事件触发的套接字从“可读”移到“不可读”,因此内核必须重新激活FD
  5. 内核:接收到第二个连接
  6. 内核:当前只有一个线程在 epoll_wait() 上等待,内核wake up线程 B
  7. 线程 A:由于它不知道内核最初是否接收了一个或多个连接,必须执行 accept()。它希望得到 EAGAIN,但却得到另一个套接字
  8. 线程 B:执行 accept(),期望得到连接,却收到 EAGAIN
  9. 线程 A:必须再次执行 accept(),得到 EAGAIN

依然会存在资源浪费的问题。 除此之外,还存在线程饥饿的问题

  1. 内核:接收到两个连接。有两个线程 A 和 B 在等待。由于“边缘触发”的行为,只通知其中一个 - 假设是线程 A
  2. 线程 A:完成 epoll_wait()
  3. 线程 A:执行 accept(),成功
  4. 内核:接收到第三个连接。该套接字处于“可读”状态,仍然是“可读”状态。由于我们处于ET触发模式,不会触发事件
  5. 线程 A:必须执行 accept(),希望得到 EAGAIN,但得到另一个套接字
  6. 内核:接收到第四个连接
  7. 线程 A:必须执行 accept(),希望得到 EAGAIN,但得到另一个套接字

Epoll的复杂性提升了使用者的难度。不熟悉最好不要乱碰

正确的使用方式:

Level triggered

  • EPOLLEXCLUSIVE 使用
  • 保证只有一个线程被wake up,防止在多core下的惊群效应
  • Nginx早起的多进程模式,使用的是Mutex
ngx_trylock_accept_mutex(ngx_cycle_t *cycle)
{
    if (ngx_shmtx_trylock(&ngx_accept_mutex)) {

        ngx_log_debug0(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                       "accept mutex locked");

        if (ngx_accept_mutex_held
            && ngx_accept_events == 0
            && !(ngx_event_flags & NGX_USE_RTSIG_EVENT))
        {
            return NGX_OK;
        }

        if (ngx_enable_accept_events(cycle) == NGX_ERROR) {
            ngx_shmtx_unlock(&ngx_accept_mutex);
            return NGX_ERROR;
        }

        ngx_accept_events = 0;
        ngx_accept_mutex_held = 1;

        return NGX_OK;
    }

    ngx_log_debug1(NGX_LOG_DEBUG_EVENT, cycle->log, 0,
                   "accept mutex lock failed: %ui", ngx_accept_mutex_held);

    if (ngx_accept_mutex_held) {
        if (ngx_disable_accept_events(cycle) == NGX_ERROR) {
            return NGX_ERROR;
        }

        ngx_accept_mutex_held = 0;
    }

    return NGX_OK;
}

需要注意的是,nginx不会让子进程一直偏好被分配。之前获得锁的,操作资源的使用上线,会放弃锁,而进行负载均衡 ngx_accept_disabled = ngx_cycle->connection_n / 8 - ngx_cycle->free_connection_n;

Edge triggered

  • EPOLLONESHOT 使用

  • 额外工作

    • 每个事件后需要额外的 epoll_ctl() 系统调用
  1. 内核:接收到两个连接。有两个线程 A 和 B 在等待。由于“边缘触发”的行为,只通知其中一个 - 假设是线程 A
  2. 线程 A:完成 epoll_wait()
  3. 线程 A:执行 accept(),成功
  4. 线程 A:执行 epoll_ctl(EPOLL_CTL_MOD),这将重置 EPOLLONESHOT 并重新激活套接字

Scaling out read

背景

想象一下,当你有大量的 HTTP 客户端连接并且希望尽快处理它们时。每个连接可能需要一些不可预测的处理,使用“合并队列”排队模型 - 拥有一个 epoll 集并使用多个线程拉取活动套接字并执行工作

level triggered

除了惊群效应之外,还存在Race condition

  1. 内核:接收到 2047 bytes的数据
  2. 内核:两个线程正在 epoll 上等待,由于 EPOLLEXCLUSIVE 行为,内核wake up了线程 A
  3. 线程 A:完成 epoll_wait()
  4. 内核:接收到 2 bytes的数据
  5. 内核:只有一个线程在 epoll 上等待,内核wake up线程 B
  6. 线程 A:执行 read(2048) 并读取了完整的 2048 字节缓冲区
  7. 线程 B:执行 read(2048) 并读取了剩余的 1 字节数据

edge triggered

依然存在Race condition

  1. 内核:接收到 2048 字节的数据。
  2. 内核:有两个线程 A 和 B 在等待数据。由于ET触发的行为,只有其中一个被通知
  3. 线程 A:完成 epoll_wait()
  4. 线程 A:执行 read(2048) 并读取了完整的 2048 字节缓冲区
  5. 内核:缓冲区为空,因此内核重新激活文件描述符
  6. 内核:接收到 1 字节的数据
  7. 内核:当前只有一个线程在 epoll_wait() 上等待,唤醒线程 B
  8. 线程 B:完成 epoll_wait()
  9. 线程 B:执行 read(2048) 并获取了 1 字节的数据
  10. 线程 A:重试 read(2048),返回空,并得到 EAGAIN

正确的方式:

  • 使用EPOLLONESHOT
  • 额外工作,重新激活FD
  • jvm 代码示例EPollPort.java
 @Override
    void startPoll(int fd, int events) {
        // update events (or add to epoll on first usage)
        int err = epollCtl(epfd, EPOLL_CTL_MOD, fd, (events | EPOLLONESHOT));
        if (err == ENOENT)
            err = epollCtl(epfd, EPOLL_CTL_ADD, fd, (events | EPOLLONESHOT));
        if (err != 0)
            throw new AssertionError();     // should not happen
    }
  • redis 单线程模型,代码简洁很多,不需要如此处理
  • Nginx 代码就复杂很多
ngx_handle_read_event(ngx_event_t *rev, ngx_uint_t flags)
{
    if (ngx_event_flags & NGX_USE_CLEAR_EVENT) {

        /* kqueue, epoll */

        if (!rev->active && !rev->ready) {
            if (ngx_add_event(rev, NGX_READ_EVENT, NGX_CLEAR_EVENT)
                == NGX_ERROR)
            {
                return NGX_ERROR;
            }
        }

        return NGX_OK;

    } else if (ngx_event_flags & NGX_USE_LEVEL_EVENT) {

        /* select, poll, /dev/poll */

        if (!rev->active && !rev->ready) {
            if (ngx_add_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT)
                == NGX_ERROR)
            {
                return NGX_ERROR;
            }

            return NGX_OK;
        }

        if (rev->active && (rev->ready || (flags & NGX_CLOSE_EVENT))) {
            if (ngx_del_event(rev, NGX_READ_EVENT, NGX_LEVEL_EVENT | flags)
                == NGX_ERROR)
            {
                return NGX_ERROR;
            }

            return NGX_OK;
        }

    } else if (ngx_event_flags & NGX_USE_EVENTPORT_EVENT) {

        /* event ports */

        if (!rev->active && !rev->ready) {
            if (ngx_add_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
                return NGX_ERROR;
            }

            return NGX_OK;
        }

        if (rev->oneshot && !rev->ready) {
            if (ngx_del_event(rev, NGX_READ_EVENT, 0) == NGX_ERROR) {
                return NGX_ERROR;
            }

            return NGX_OK;
        }
    }

    /* aio, iocp, rtsig */

    return NGX_OK;
}

见👆的oneshot位置

总结

正确使用 epoll() 是困难的。理解额外的标志 EPOLLONESHOT 和 EPOLLEXCLUSIVE,谨慎使用

当然,最好的方式还是使用成熟的开源软件