Redis源码 - 事件处理 part1
背景
为了进一步了解linux EPoll I/O 处理 & linux Socket,想找一个框架进行学习。选来选去Redis最合适。
-
Why Redis
-
Redis代码量相对netty 和nginx 要少很多,简洁就是美😄
-
comments够多
-
附加值高
- 还可以学习如何做replicate
- 还可以学习leader election
-
-
How to do
-
Redis 源码v3.2.13
- 简单入手,没有从最新版代码开始
-
GDB debug
-
chatgpt 作为code的理解者给出答案,从而进一步结合代码,验证自己的理解
-
Linux 4.4.0 内核源码分析 TCP 实现 pdf
-
TLDR; Redis 事件处理
首先,我们先看下set key1 v1,redis gdb程序堆栈, 请倒序观看
Thread 1 "redis-server" hit Breakpoint 1, dictAdd (d=0x7ffff6a181e0, key=0x7ffff6a16701, val=0x7ffff6a5a200) at dict.c:324
(gdb) bt
#0 dictAdd (d=0x7ffff6a181e0, key=0x7ffff6a16701, val=0x7ffff6a5a200) at dict.c:324
#1 0x000000000043f179 in dbAdd (db=0x7ffff6a27800, key=0x7ffff6a5a218, val=0x7ffff6a5a200) at db.c:159
#2 0x000000000043f2af in setKey (db=0x7ffff6a27800, key=0x7ffff6a5a218, val=0x7ffff6a5a200) at db.c:186
#3 0x000000000044b873 in setGenericCommand (c=0x7ffff6b4e680, flags=0, key=0x7ffff6a5a218, val=0x7ffff6a5a200, expire=0x0, unit=0, ok_reply=0x0, abort_reply=0x0) at t_string.c:86
#4 0x000000000044bbc6 in setCommand (c=0x7ffff6b4e680) at t_string.c:139
#5 0x0000000000429c9d in call (c=0x7ffff6b4e680, flags=15) at server.c:2265
#6 0x000000000042a7e9 in processCommand (c=0x7ffff6b4e680) at server.c:2544
#7 0x000000000043a377 in processInputBuffer (c=0x7ffff6b4e680) at networking.c:1311
#8 0x000000000043a668 in readQueryFromClient (el=0x7ffff6a2f0a0, fd=5, privdata=0x7ffff6b4e680, mask=1) at networking.c:1375
#9 0x0000000000421c85 in aeProcessEvents (eventLoop=0x7ffff6a2f0a0, flags=3) at ae.c:431
#10 0x0000000000421e68 in aeMain (eventLoop=0x7ffff6a2f0a0) at ae.c:489
#11 0x000000000042e591 in main (argc=2, argv=0x7fffffffe428) at server.c:4133
aeMain 是整个event loop的入口
void aeMain(aeEventLoop *eventLoop) {
eventLoop->stop = 0;
while (!eventLoop->stop) {
if (eventLoop->beforesleep != NULL)
eventLoop->beforesleep(eventLoop);
aeProcessEvents(eventLoop, AE_ALL_EVENTS);
}
}
aeProcessEvents是处理逻辑
/* Process every pending time event, then every pending file event
* (that may be registered by time event callbacks just processed).
* Without special flags the function sleeps until some file event
* fires, or when the next time event occurs (if any).
*
* If flags is 0, the function does nothing and returns.
* if flags has AE_ALL_EVENTS set, all the kind of events are processed.
* if flags has AE_FILE_EVENTS set, file events are processed.
* if flags has AE_TIME_EVENTS set, time events are processed.
* if flags has AE_DONT_WAIT set the function returns ASAP until all
* the events that's possible to process without to wait are processed.
*
* The function returns the number of events processed. */
int aeProcessEvents(aeEventLoop *eventLoop, int flags)
aeEventLoop 负责保存待处理文件事件和时间事件的结构体, 以及保存大量事件执行的上下文信息
typedef struct aeEventLoop {
int maxfd; /* highest file descriptor currently registered */
int setsize; /* max number of file descriptors tracked */
long long timeEventNextId;
time_t lastTime; /* Used to detect system clock skew */
aeFileEvent *events; /* Registered events */
aeFiredEvent *fired; /* Fired events */
aeTimeEvent *timeEventHead;
int stop;
void *apidata; /* This is used for polling API specific data */
aeBeforeSleepProc *beforesleep;
} aeEventLoop;
-
aeFileEvent 表示文件事件 flag = AE_FILE_EVENTS
-
aeTimeEvent 表示时间事件 flag = AE_TIME_EVENTS
- flag = AE_DONT_WAIT 需要立即处理
-
aeFiredEvent 表示Fired(就绪)事件
- Linux 系统通过 epoll_wait 接口获取就绪事件,每个事件分别存储在 aeFiredEvent 数组中
总体说来,Redis主要处理两类事件:文件 and 时间
文件事件
- Redis 先计算shortest最近事件发生的时间,再计算需等待的时间tvp
- aeApiPoll(eventLoop, tvp) EPoll wait获取fired事件
- 处理订阅的read/write事件
结合redis,看下gdb调用stack更加直观
(gdb) bt
#0 0x00007ffff73620f7 in epoll_wait () from /lib64/libc.so.6
#1 0x0000000000420fdd in aeApiPoll (eventLoop=0x7ffff6a2f0a0, tvp=0x7fffffffe260) at ae_epoll.c:112
#2 0x0000000000421bd0 in aeProcessEvents (eventLoop=0x7ffff6a2f0a0, flags=3) at ae.c:404
#3 0x0000000000421e68 in aeMain (eventLoop=0x7ffff6a2f0a0) at ae.c:489
#4 0x000000000042e591 in main (argc=2, argv=0x7fffffffe428) at server.c:4133
下面是大致的处理逻辑,去掉了部分注释
numevents = aeApiPoll(eventLoop, tvp);
for (j = 0; j < numevents; j++) {
aeFileEvent *fe = &eventLoop->events[eventLoop->fired[j].fd];
int mask = eventLoop->fired[j].mask;
int fd = eventLoop->fired[j].fd;
int fired = 0; /* Number of events fired for current fd. */
int invert = fe->mask & AE_BARRIER;
if (!invert && fe->mask & mask & AE_READABLE) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
/* Fire the writable event. */
if (fe->mask & mask & AE_WRITABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
/* If we have to invert the call, fire the readable event now
* after the writable one. */
if (invert && fe->mask & mask & AE_READABLE) {
if (!fired || fe->wfileProc != fe->rfileProc) {
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fired++;
}
}
processed++;
}
注意:AE_BARRIER 表示优先可写事件。正常情况,一般先读后写。
文件事件如果绑定了对应的读/写事件,就会执行对应的处理逻辑.这里不对细节进行展开
fe->rfileProc(eventLoop,fd,fe->clientData,mask);
fe->wfileProc(eventLoop,fd,fe->clientData,mask);
其中mask表示事件的类型值
那么具体的事件处理逻辑是什么呢?以read event为例
void readQueryFromClient(aeEventLoop *el, int fd, void *privdata, int mask);
👇
void processInputBuffer(client *c);
👇
int processCommand(client *c);
......
Redis如何监听到发生de这些事件呢?
epoll_wait调用
static int aeApiPoll(aeEventLoop *eventLoop, struct timeval *tvp) {
aeApiState *state = eventLoop->apidata;
int retval, numevents = 0;
retval = epoll_wait(state->epfd,state->events,eventLoop->setsize,
tvp ? (tvp->tv_sec*1000 + tvp->tv_usec/1000) : -1);
if (retval > 0) {
int j;
numevents = retval;
for (j = 0; j < numevents; j++) {
int mask = 0;
struct epoll_event *e = state->events+j;
if (e->events & EPOLLIN) mask |= AE_READABLE;
if (e->events & EPOLLOUT) mask |= AE_WRITABLE;
if (e->events & EPOLLERR) mask |= AE_WRITABLE;
if (e->events & EPOLLHUP) mask |= AE_WRITABLE;
eventLoop->fired[j].fd = e->data.fd;
eventLoop->fired[j].mask = mask;
}
}
return numevents;
}
时间事件
Redis里包括了两种时间事件
- 周期性定时事件
- 非周期性定时事件,只执行1次
使用GDB打印执行堆栈:
(gdb) bt
#0 serverCron (eventLoop=0x7ffff6a2f0a0, id=0, clientData=0x0) at server.c:1110
#1 0x00000000004219c8 in processTimeEvents (eventLoop=0x7ffff6a2f0a0) at ae.c:326
#2 0x0000000000421d4f in aeProcessEvents (eventLoop=0x7ffff6a2f0a0, flags=3) at ae.c:457
#3 0x0000000000421e68 in aeMain (eventLoop=0x7ffff6a2f0a0) at ae.c:489
#4 0x000000000042e591 in main (argc=2, argv=0x7fffffffe428) at server.c:4133
核心处理过程如下
/* Process time events */
static int processTimeEvents(aeEventLoop *eventLoop) {
int processed = 0;
aeTimeEvent *te, *prev;
long long maxId;
time_t now = time(NULL);
if (now < eventLoop->lastTime) {
te = eventLoop->timeEventHead;
while(te) {
te->when_sec = 0;
te = te->next;
}
}
eventLoop->lastTime = now;
prev = NULL;
te = eventLoop->timeEventHead;
maxId = eventLoop->timeEventNextId-1;
while(te) {
long now_sec, now_ms;
long long id;
/* Remove events scheduled for deletion. */
if (te->id == AE_DELETED_EVENT_ID) {
aeTimeEvent *next = te->next;
if (prev == NULL)
eventLoop->timeEventHead = te->next;
else
prev->next = te->next;
if (te->finalizerProc)
te->finalizerProc(eventLoop, te->clientData);
zfree(te);
te = next;
continue;
}
if (te->id > maxId) {
te = te->next;
continue;
}
aeGetTime(&now_sec, &now_ms);
if (now_sec > te->when_sec ||
(now_sec == te->when_sec && now_ms >= te->when_ms))
{
int retval;
id = te->id;
retval = te->timeProc(eventLoop, id, te->clientData);
processed++;
if (retval != AE_NOMORE) {
aeAddMillisecondsToNow(retval,&te->when_sec,&te->when_ms);
} else {
te->id = AE_DELETED_EVENT_ID;
}
}
prev = te;
te = te->next;
}
return processed;
}
流程
- 如果系统时间调整过,比如从未来调整为准确值,重置已有的now < lastTime 的事件 if (now < eventLoop->lastTime) { te = eventLoop->timeEventHead; while(te) { te->when_sec = 0; te = te->next; } }
- 循环取出事件,从链表结构eventLoop->timeEventHead里
- 移除已经标记为删除的时间事件
- 处理事件te->timeProc(eventLoop, id, te->clientData) ,返回处理结果retval int值,
- retval == AE_NOMORE(-1):表明事件不需要再次触发,需要删除。将时间事件的 id 设置为 AE_DELETED_EVENT_ID,等待下次 aeProcessEvents 执行时将事件清除
- retval != AE_NOMORE:如果返回的值不是 AE_NOMORE,说明还需要再次触发这个事件。在这种情况下,将当前系统时间与返回的时间间隔相加,更新时间事件 te 的触发时间,再下一次循环被处理(这种做法防止了空循环的可能,每一次循环都有活干)
- timeProc最终指向的是serverCron函数,return 1000/server.hz;hz在redis config文件里配置
注:serverCron函数的细节不在这里展开,说说主要完成哪些工作
- 过期键的收集(还会在查找时以惰性方式执行)
- 异步回收需要关闭的链接(socket)
- 更新一些统计信息
- 需要扩容和缩容的哈希表(dict)进行数据迁移
- 触发 BGSAVE / AOF 重写,并处理终止的子进程。
- 客户端的各种超时
- Replication重连
- ......
事件从哪里来?
文件事件
文件事件的处理使用了I/O多路复用,linux平台下,事件的创建由EPoll完成,放至aeFiredEvent
使用 aeCreateFileEvent、 aeDeleteFileEvent 来添加删除需要监听的文件描述符以及事件 在事件就绪后,调用 aeApiPoll 把对应的文件描述符和事件放入 aeFiredEvent,并在 processEvents 方法中执行事件对应的回调 -- 订阅事件时注册的handler函数
redis ae_poll.c是对epoll使用封装
时间事件
时间事件的创建,是在server init过程中完成的,最终将事件放入链表eventLoop->timeEventHead
void initServer(void){
省略
if(aeCreateTimeEvent(server.el, 1, serverCron, NULL, NULL) == AE_ERR) {
serverPanic("Can't create the serverCron time event.");
exit(1);
}
}