Redis源码 - 如何处理命令
本文亦可以看作事件处理 PART2 的续篇,上一篇谈了谈Redis 客户端连接的过程和客户端数据就绪过程
本文主要想聊聊数据就绪后,如何处理命令,如何Reply给客户端
从一条命令的执行说起
127.0.0.1:6379> set key1 v1
(gdb) bt
(gdb) bt
#0 _addReplyToBuffer (c=0x7ffff6b4e680, s=0x7ffff6a16689 "+OK\r\n", len=5) at networking.c:214
#1 0x0000000000437ba1 in addReply (c=0x7ffff6b4e680, obj=0x7ffff6a178c0) at networking.c:340
#2 0x000000000044b922 in setGenericCommand (c=0x7ffff6b4e680, flags=0, key=0x7ffff6a551b8, val=0x7ffff6a551a0, expire=0x0, unit=0, ok_reply=0x0, abort_reply=0x0) at t_string.c:92
#3 0x000000000044bbc6 in setCommand (c=0x7ffff6b4e680) at t_string.c:139
#4 0x0000000000429c9d in call (c=0x7ffff6b4e680, flags=15) at server.c:2265
#5 0x000000000042a7e9 in processCommand (c=0x7ffff6b4e680) at server.c:2544
#6 0x000000000043a377 in processInputBuffer (c=0x7ffff6b4e680) at networking.c:1311
#7 0x000000000043a668 in readQueryFromClient (el=0x7ffff6a2f0a0, fd=5, privdata=0x7ffff6b4e680, mask=1) at networking.c:1375
#8 0x0000000000421c85 in aeProcessEvents (eventLoop=0x7ffff6a2f0a0, flags=3) at ae.c:431
#9 0x0000000000421e68 in aeMain (eventLoop=0x7ffff6a2f0a0) at ae.c:489
#10 0x000000000042e591 in main (argc=2, argv=0x7fffffffe428) at server.c:4133
server 利用epoll_wait监听就绪事件,当客户端的请求到达时触发回调(上一篇已详细说明,本文不再重复)
回调即从readQueryFromClient开始,处理命令请求
readQueryFromClient-> processInputBuffer-> processCommand->call->setCommand...
-
readQueryFromClient 函数会从客户端连接的 socket 中,读取数据
-
processInputBuffer 解析收到的客户端数据,处理两类命令
- 管道命令 PROTO_REQ_INLINE
- RESP命令 PROTO_REQ_MULTIBULK
-
processCommand
- lookupCommand 查找匹配命令,据解析的命令名称,在 commands 对应的哈希表中查找相应的命令--redisCommandTable
- call()处理命令
- addReply
- 调用 _addReplyToBuffer 等函数,将要返回的结果添加到客户端的输出缓冲区中
- processCommand还需要处理达到内存限制的情况,将在后面的文章探讨
/* Call() is the core of Redis execution of a command.
*
* The following flags can be passed:
* CMD_CALL_NONE No flags.
* CMD_CALL_SLOWLOG Check command speed and log in the slow log if needed.
* CMD_CALL_STATS Populate command stats.
* CMD_CALL_PROPAGATE_AOF Append command to AOF if it modified the dataset
* or if the client flags are forcing propagation.
* CMD_CALL_PROPAGATE_REPL Send command to salves if it modified the dataset
* or if the client flags are forcing propagation.
* CMD_CALL_PROPAGATE Alias for PROPAGATE_AOF|PROPAGATE_REPL.
* CMD_CALL_FULL Alias for SLOWLOG|STATS|PROPAGATE.
*
* The exact propagation behavior depends on the client flags.
* Specifically:
*
* 1. If the client flags CLIENT_FORCE_AOF or CLIENT_FORCE_REPL are set
* and assuming the corresponding CMD_CALL_PROPAGATE_AOF/REPL is set
* in the call flags, then the command is propagated even if the
* dataset was not affected by the command.
* 2. If the client flags CLIENT_PREVENT_REPL_PROP or CLIENT_PREVENT_AOF_PROP
* are set, the propagation into AOF or to slaves is not performed even
* if the command modified the dataset.
*
* Note that regardless of the client flags, if CMD_CALL_PROPAGATE_AOF
* or CMD_CALL_PROPAGATE_REPL are not set, then respectively AOF or
* slaves propagation will never occur.
*
* Client flags are modified by the implementation of a given command
* using the following API:
*
* forceCommandPropagation(client *c, int flags);
* preventCommandPropagation(client *c);
* preventCommandAOF(client *c);
* preventCommandReplication(client *c);
*
*/
void call(client *c, int flags)
/* If this function gets called we already read a whole
* command, arguments are in the client argv/argc fields.
* processCommand() execute the command or prepare the
* server for a bulk read from the client.
*
* If C_OK is returned the client is still alive and valid and
* other operations can be performed by the caller. Otherwise
* if C_ERR is returned the client was destroyed (i.e. after QUIT). */
int processCommand(client *c) {
/* The QUIT command is handled separately. Normal command procs will
* go through checking for replication and QUIT will cause trouble
* when FORCE_REPLICATION is enabled and would be implemented in
* a regular command proc. */
if (!strcasecmp(c->argv[0]->ptr,"quit")) {
addReply(c,shared.ok);
c->flags |= CLIENT_CLOSE_AFTER_REPLY;
return C_ERR;
}
/* Now lookup the command and check ASAP about trivial error conditions
* such as wrong arity, bad command name and so forth. */
c->cmd = c->lastcmd = lookupCommand(c->argv[0]->ptr);
......
}
struct redisCommand *lookupCommand(sds name) {
return dictFetchValue(server.commands, name);
}
addReply之后,数据如何写回给client呢?
aeMain loop循环调用beforeSleep->handleClientsWithPendingWrites->writeToClient
(gdb) bt
#0 writeToClient (fd=5, c=0x7ffff6b4e680, handler_installed=0) at networking.c:967
#1 0x0000000000439733 in handleClientsWithPendingWrites () at networking.c:1010
#2 0x000000000042795d in beforeSleep (eventLoop=0x7ffff6a2f0a0) at server.c:1366
#3 0x0000000000421e57 in aeMain (eventLoop=0x7ffff6a2f0a0) at ae.c:488
#4 0x000000000042e591 in main (argc=2, argv=0x7fffffffe428) at server.c:4133
最终写到socket 发送缓冲区, 系统调用write写回客户端
write是如何处理数据的呢?
发送数据从用户层通过系统调用进入到内核逻辑,通过 fd 文件描述符,找到对应的文件,然后再找到与文件关联的对应的 socket,进行发送数据
socket 文件系统处理
write -> fd -> file -> sock_sendmsg
/* ./include/linux/fs.h */
SYSCALL_DEFINE3(write, unsigned int, fd, const char __user *, buf,
size_t, count) {
return ksys_write(fd, buf, count);
}
ssize_t ksys_write(unsigned int fd, const char __user *buf, size_t count) {
...
if (f.file) {
...
ret = vfs_write(f.file, buf, count, &pos);
...
}
...
}
ssize_t vfs_write(struct file *file, const char __user *buf, size_t count, loff_t *pos) {
...
ret = __vfs_write(file, buf, count, pos);
...
}
ssize_t __vfs_write(struct file *file, const char __user *p, size_t count,
loff_t *pos) {
if (file->f_op->write)
return file->f_op->write(file, p, count, pos);
/* sock_write_iter */
else if (file->f_op->write_iter)
return new_sync_write(file, p, count, pos);
else
return -EINVAL;
}
static ssize_t new_sync_write(struct file *filp, const char __user *buf, size_t len, loff_t *ppos) {
...
ret = call_write_iter(filp, &kiocb, &iter);
...
}
static inline ssize_t call_write_iter(struct file *file, struct kiocb *kio,
struct iov_iter *iter) {
/* sock_write_iter */
return file->f_op->write_iter(kio, iter);
}
static ssize_t sock_write_iter(struct kiocb *iocb, struct iov_iter *from) {
struct file *file = iocb->ki_filp;
struct socket *sock = file->private_data;
struct msghdr msg = {
.msg_iter = *from,
.msg_iocb = iocb
};
ssize_t res;
...
res = sock_sendmsg(sock, &msg);
*from = msg.msg_iter;
return res;
}
int sock_sendmsg(struct socket *sock, struct msghdr *msg) {
int err = security_socket_sendmsg(sock, msg,
msg_data_left(msg));
return err ?: sock_sendmsg_nosec(sock, msg);
}
/* net/socket.c */
static inline int sock_sendmsg_nosec(struct socket *sock, struct msghdr *msg) {
/* inet_sendmsg */
int ret = sock->ops->sendmsg(sock, msg, msg_data_left(msg));
...
}
/* net/ipv4/af_inet.c */
int inet_sendmsg(struct socket *sock, struct msghdr *msg, size_t size) {
struct sock *sk = sock->sk;
...
return sk->sk_prot->sendmsg(sk, msg, size);
}
/* Location: net/ipv4/tcp.c
*
* Parameter:
* sk 传输所使用的套接字
* msg 要传输的用户层的数据包
* size 用户要传输的数据的大小
*/
int tcp_sendmsg(struct sock *sk, struct msghdr *msg, size_t size) {
...
//主要是把用户层的数据填充到内核的发送队列进行发送
ret = tcp_sendmsg_locked(sk, msg, size);
...
}
......
/* Push out any pending frames which were held back due to
* TCP_CORK or attempt at coalescing tiny packets.
* The socket must be locked by the caller.
*/
void __tcp_push_pending_frames(struct sock *sk, unsigned int cur_mss,
int nonagle)
{
/* If we are closed, the bytes will have to remain here.
* In time closedown will finish, we empty the write queue and
* all will be happy.
*/
if (unlikely(sk->sk_state == TCP_CLOSE))
return;
if (tcp_write_xmit(sk, cur_mss, nonagle, 0,
sk_gfp_mask(sk, GFP_ATOMIC)))
tcp_check_probe_timer(sk);
}
/* This routine writes packets to the network. It advances the
* send_head. This happens as incoming acks open up the remote
* window for us.
*
* LARGESEND note: !tcp_urg_mode is overkill, only frames between
* snd_up-64k-mss .. snd_up cannot be large. However, taking into
* account rare use of URG, this is not a big flaw.
*
* Send at most one packet when push_one > 0. Temporarily ignore
* cwnd limit to force at most one packet out when push_one == 2.
* Returns true, if no segments are in flight and we have queued segments,
* but cannot send anything now because of SWS or another problem.
*/
static bool tcp_write_xmit(struct sock *sk, unsigned int mss_now, int nonagle,
int push_one, gfp_t gfp)
如👆的调用链路,最终处理在 tcp_sendmsg ,进一步说,socket系统调用完成数据从用户空间到内核空间的copy,最后交给上层协议TCP/IP处理
- sk_buff socket 数据缓存,sk_buff 用于保存接收或者发送的数据报文信息,目的为了方便网络协议栈的各层间进行无缝传递数据
TCP层处理
tcp_sendmsg->tcp_sendmsg_locked->__tcp_push_pending_frames( or tcp_push_one)-> tcp_write_xmit
tcp_write_xmit()的核心部分为一个循环,每次调用tcp_send_head()获取头部sk_buff,若已经读完则退出循环。循环内逻辑为:
- 调用tcp_init_tso_segs()进行TSO(TCP Segmentation Offload)相关工作。当需要发送较大的网络包的时候,我们可以选择在协议栈中进行分段,也可以选择延迟到硬件网卡去进行自动分段以降低CPU负载。
- 调用tcp_cwnd_test()检查现在拥塞窗口是否允许发包,如果允许,返回可以发送多少个sk_buff。
- 调用tcp_snd_wnd_test()检测当前第一个sk_buff的序列号是否满足要求: sk_buff 中的 end_seq 和 tcp_wnd_end(tp) 之间的关系,也即这个 sk_buff 是否在滑动窗口的允许范围之内。
- tso_segs为1可能是nagle协议导致,需要进行判断。其次需要判断TSO是否延迟到硬件网卡进行。
- 调用tcp_mss_split_point()判断是否会因为超出 mss 而分段,还会判断另一个条件,就是是否在滑动窗口的运行范围之内,如果小于窗口的大小,也需要分段,也即需要调用 tso_fragment()。
- 调用tcp_small_queue_check()检查是否需要采取小队列:TCP小队列对每个TCP数据流中,能够同时参与排队的字节数做出了限制,这个限制是通过net.ipv4.tcp_limit_output_bytes内核选项实现的。当TCP发送的数据超过这个限制时,多余的数据会被放入另外一个队列中,再通过tastlet机制择机发送。由于该限制的存在,TCP通过一味增大缓冲区的方式是无法发出更多的数据包的。
- 调用tcp_transmit_skb()完成sk_buff的真正发送工作,将数据通过 tcp_transmit_skb 填充 TCP 头部,从传输层发送到 IP 层处理
复杂的流控也在这里,暂不进行展开
IP/MAC/Driver 层处理
过于底层,略
最后的话
本文省略了RESP协议解析过程,网络上已经有很多优秀的文章讲解,这里不再画蛇添足
Redis面向的是内存数据库的操作,选择了简洁的设计模式
易于解析(序列化和反序列化)、易于理解,值得借鉴
参考: read more