Redis源码 - 事件处理 part1

背景

为了进一步了解linux EPoll I/O 处理 & linux Socket,想找一个框架进行学习。选来选去Redis最合适。

  • Why Redis

    • Redis代码量相对netty 和nginx 要少很多,简洁就是美😄

    • comments够多

    • 附加值高

      • 还可以学习如何做replicate
      • 还可以学习leader election
  • How to do

    • Redis 源码v3.2.13

      • 简单入手,没有从最新版代码开始
    • GDB debug

    • chatgpt 作为code的理解者给出答案,从而进一步结合代码,验证自己的理解

    • Linux 4.4.0 内核源码分析 TCP 实现 pdf

TLDR; Redis 事件处理

首先,我们先看下set key1 v1,redis gdb程序堆栈, 请倒序观看

Thread 1 "redis-server" hit Breakpoint 1, dictAdd (d=0x7ffff6a181e0, key=0x7ffff6a16701, val=0x7ffff6a5a200) at dict.c:324
(gdb) bt
#0  dictAdd (d=0x7ffff6a181e0, key=0x7ffff6a16701, val=0x7ffff6a5a200) at dict.c:324
#1  0x000000000043f179 in dbAdd (db=0x7ffff6a27800, key=0x7ffff6a5a218, val=0x7ffff6a5a200) at db.c:159
#2  0x000000000043f2af in setKey (db=0x7ffff6a27800, key=0x7ffff6a5a218, val=0x7ffff6a5a200) at db.c:186
#3  0x000000000044b873 in setGenericCommand (c=0x7ffff6b4e680, flags=0, key=0x7ffff6a5a218, val=0x7ffff6a5a200, expire=0x0, unit=0, ok_reply=0x0, abort_reply=0x0) at t_string.c:86
#4  0x000000000044bbc6 in setCommand (c=0x7ffff6b4e680) at t_string.c:139
#5  0x0000000000429c9d in call (c=0x7ffff6b4e680, flags=15) at server.c:2265
#6  0x000000000042a7e9 in processCommand (c=0x7ffff6b4e680) at server.c:2544
#7  0x000000000043a377 in processInputBuffer (c=0x7ffff6b4e680) at networking.c:1311
#8  0x000000000043a668 in readQueryFromClient (el=0x7ffff6a2f0a0, fd=5, privdata=0x7ffff6b4e680, mask=1) at networking.c:1375
#9  0x0000000000421c85 in aeProcessEvents (eventLoop=0x7ffff6a2f0a0, flags=3) at ae.c:431
#10 0x0000000000421e68 in aeMain (eventLoop=0x7ffff6a2f0a0) at ae.c:489
#11 0x000000000042e591 in main (argc=2, argv=0x7fffffffe428) at server.c:4133

aeMain 是整个event loop的入口

void aeMain(aeEventLoop *eventLoop) {
    eventLoop->stop = 0;
    while (!eventLoop->stop) {
        if (eventLoop->beforesleep != NULL)
            eventLoop->beforesleep(eventLoop);
        aeProcessEvents(eventLoop, AE_ALL_EVENTS);
    }
}

aeProcessEvents是处理逻辑

/* Process every pending time event, then every pending file event
 * (that may be registered by time event callbacks just processed).
 * Without special flags the function sleeps until some file event
 * fires, or when the next time event occurs (if any).
 *
 * If flags is 0, the function does nothing and returns.
 * if flags has AE_ALL_EVENTS set, all the kind of events are processed.
 * if flags has AE_FILE_EVENTS set, file events are processed.
 * if flags has AE_TIME_EVENTS set, time events are processed.
 * if flags has AE_DONT_WAIT set the function returns ASAP until all
 * the events that's possible to process without to wait are processed.
 *
 * The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)

aeEventLoop 负责保存待处理文件事件和时间事件的结构体, 以及保存大量事件执行的上下文信息

typedef struct aeEventLoop {
    int maxfd;   /* highest file descriptor currently registered */
    int setsize; /* max number of file descriptors tracked */
    long long timeEventNextId;
    time_t lastTime;     /* Used to detect system clock skew */
    aeFileEvent *events; /* Registered events */
    aeFiredEvent *fired; /* Fired events */
    aeTimeEvent *timeEventHead;
    int stop;
    void *apidata; /* This is used for polling API specific data */
    aeBeforeSleepProc *beforesleep;
} aeEventLoop;
  • aeFileEvent 表示文件事件 flag = AE_FILE_EVENTS

  • aeTimeEvent 表示时间事件 flag = AE_TIME_EVENTS

    • flag = AE_DONT_WAIT 需要立即处理
  • aeFiredEvent 表示Fired(就绪)事件

    • Linux 系统通过 epoll_wait 接口获取就绪事件,每个事件分别存储在 aeFiredEvent 数组中

总体说来,Redis主要处理两类事件:文件 and 时间

文件事件

  1. Redis 先计算shortest最近事件发生的时间,再计算需等待的时间tvp
  2. aeApiPoll(eventLoop, tvp) EPoll wait获取fired事件
  3. 处理订阅的read/write事件

结合redis,看下gdb调用stack更加直观

(gdb) bt
#0  0x00007ffff73620f7 in epoll_wait () from /lib64/libc.so.6
#1  0x0000000000420fdd in aeApiPoll (eventLoop=0x7ffff6a2f0a0, tvp=0x7fffffffe260) at ae_epoll.c:112
#2  0x0000000000421bd0 in aeProcessEvents (eventLoop=0x7ffff6a2f0a0, flags=3) at ae.c:404
#3  0x0000000000421e68 in aeMain (eventLoop=0x7ffff6a2f0a0) at ae.c:489
#4  0x000000000042e591 in main (argc=2, argv=0x7fffffffe428) at server.c:4133

下面是大致的处理逻辑,去掉了部分注释

        numevents = aeApiPoll(eventLoop, tvp);
        for (j = 0; j < numevents; j++) {
            aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
            int mask = eventLoop->fired[j].mask;
            int fd = eventLoop->fired[j].fd;
            int fired = 0; /* Number of events fired for current fd. */
            int invert = fe->mask & AE_BARRIER;

            if (!invert && fe->mask & mask & AE_READABLE) {
                fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                fired++;
            }

            /* Fire the writable event. */
            if (fe->mask & mask & AE_WRITABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->wfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            /* If we have to invert the call, fire the readable event now
             * after the writable one. */
            if (invert && fe->mask & mask & AE_READABLE) {
                if (!fired || fe->wfileProc != fe->rfileProc) {
                    fe->rfileProc(eventLoop,fd,fe->clientData,mask);
                    fired++;
                }
            }

            processed++;
        }

注意:AE_BARRIER 表示优先可写事件。正常情况,一般先读后写。

文件事件如果绑定了对应的读/写事件,就会执行对应的处理逻辑.这里不对细节进行展开

fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fe->wfileProc(eventLoop,fd,fe->clientData,mask);

其中mask表示事件的类型值

那么具体的事件处理逻辑是什么呢?以read event为例

void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask);
👇
void processInputBuffer(client *c);
👇
int processCommand(client *c);
......

Redis如何监听到发生de这些事件呢?

epoll_wait调用

static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
    aeApiState *state = eventLoop->apidata;
    int retval, numevents = 0;

    retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
            tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
    if (retval > 0) {
        int j;

        numevents = retval;
        for (j = 0; j < numevents; j++) {
            int mask = 0;
            struct epoll_event *e = state->events+j;

            if (e->events & EPOLLIN) mask |= AE_READABLE;
            if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
            if (e->events & EPOLLERR) mask |= AE_WRITABLE;
            if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
            eventLoop->fired[j].fd = e->data.fd;
            eventLoop->fired[j].mask = mask;
        }
    }
    return numevents;
}

时间事件

Redis里包括了两种时间事件

  • 周期性定时事件
  • 非周期性定时事件,只执行1次

使用GDB打印执行堆栈:

(gdb) bt
#0  serverCron (eventLoop=0x7ffff6a2f0a0, id=0, clientData=0x0) at server.c:1110
#1  0x00000000004219c8 in processTimeEvents (eventLoop=0x7ffff6a2f0a0) at ae.c:326
#2  0x0000000000421d4f in aeProcessEvents (eventLoop=0x7ffff6a2f0a0, flags=3) at ae.c:457
#3  0x0000000000421e68 in aeMain (eventLoop=0x7ffff6a2f0a0) at ae.c:489
#4  0x000000000042e591 in main (argc=2, argv=0x7fffffffe428) at server.c:4133

核心处理过程如下

/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
    int processed = 0;
    aeTimeEvent *te, *prev;
    long long maxId;
    time_t now = time(NULL);

    if (now < eventLoop->lastTime) {
        te = eventLoop->timeEventHead;
        while(te) {
            te->when_sec = 0;
            te = te->next;
        }
    }
    eventLoop->lastTime = now;

    prev = NULL;
    te = eventLoop->timeEventHead;
    maxId = eventLoop->timeEventNextId-1;
    while(te) {
        long now_sec, now_ms;
        long long id;

        /* Remove events scheduled for deletion. */
        if (te->id == AE_DELETED_EVENT_ID) {
            aeTimeEvent *next = te->next;
            if (prev == NULL)
                eventLoop->timeEventHead = te->next;
            else
                prev->next = te->next;
            if (te->finalizerProc)
                te->finalizerProc(eventLoop, te->clientData);
            zfree(te);
            te = next;
            continue;
        }

        if (te->id > maxId) {
            te = te->next;
            continue;
        }
        aeGetTime(&now_sec, &now_ms);
        if (now_sec > te->when_sec ||
            (now_sec == te->when_sec && now_ms >= te->when_ms))
        {
            int retval;

            id = te->id;
            retval = te->timeProc(eventLoop, id, te->clientData);
            processed++;
            if (retval != AE_NOMORE) {
                aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
            } else {
                te->id = AE_DELETED_EVENT_ID;
            }
        }
        prev = te;
        te = te->next;
    }
    return processed;
}

流程

  1. 如果系统时间调整过,比如从未来调整为准确值,重置已有的now < lastTime 的事件 if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } }
  2. 循环取出事件,从链表结构eventLoop->timeEventHead里
  3. 移除已经标记为删除的时间事件
  4. 处理事件te->timeProc(eventLoop, id, te->clientData) ,返回处理结果retval int值,
  5. retval == AE_NOMORE(-1):表明事件不需要再次触发,需要删除。将时间事件的 id 设置为 AE_DELETED_EVENT_ID,等待下次 aeProcessEvents 执行时将事件清除
  6. retval != AE_NOMORE:如果返回的值不是 AE_NOMORE,说明还需要再次触发这个事件。在这种情况下,将当前系统时间与返回的时间间隔相加,更新时间事件 te 的触发时间,再下一次循环被处理(这种做法防止了空循环的可能,每一次循环都有活干)
  7. timeProc最终指向的是serverCron函数,return 1000/server.hz;hz在redis config文件里配置

注:serverCron函数的细节不在这里展开,说说主要完成哪些工作

  1. 过期键的收集(还会在查找时以惰性方式执行)
  2. 异步回收需要关闭的链接(socket)
  3. 更新一些统计信息
  4. 需要扩容和缩容的哈希表(dict)进行数据迁移
  5. 触发 BGSAVE / AOF 重写,并处理终止的子进程。
  6. 客户端的各种超时
  7. Replication重连
  8. ......

事件从哪里来?

文件事件

文件事件的处理使用了I/O多路复用,linux平台下,事件的创建由EPoll完成,放至aeFiredEvent

本地图片1

使用 aeCreateFileEvent、 aeDeleteFileEvent 来添加删除需要监听的文件描述符以及事件 在事件就绪后,调用 aeApiPoll 把对应的文件描述符和事件放入 aeFiredEvent,并在 processEvents 方法中执行事件对应的回调 -- 订阅事件时注册的handler函数

redis ae_poll.c是对epoll使用封装

时间事件

时间事件的创建,是在server init过程中完成的,最终将事件放入链表eventLoop->timeEventHead

void initServer(void){
    省略
    if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
        serverPanic("Can't create the serverCron time event.");
        exit(1);
    }
}