Redis源码 - 如何管理网络连接
前言
在前面的文章Redis事件处理中,介绍了clients连接的过程
本文主要目标是,server端如何管理clients
Client相关数据结构
struct redisServer {
......
list *clients; /* List of active clients */
list *clients_to_close; /* Clients to close asynchronously */
list *clients_pending_write; /* There is to write or install handler. */
......
};
- clients 客户端链表,客户端新连接会存储在链表里。
- clients_to_close 客户端关闭链表
- clients_pending_write 延迟写数据客户端链表,异步操作,数据并不是读出来进行处理后就马上发送;服务处理完逻辑后会将数据写入 client 的写入缓冲区(buf/reply),并记录下当前客户端,在 beforeSleep 里进行统一发送。(参考 clientInstallWriteHandler 源码)
typedef struct client {
uint64_t id; /* Client incremental unique ID. */
int fd; /* Client socket. */
redisDb *db; /* Pointer to currently SELECTed DB. */
int dictid; /* ID of the currently SELECTed DB. */
robj *name; /* As set by CLIENT SETNAME. */
sds querybuf; /* Buffer we use to accumulate client queries. */
size_t querybuf_peak; /* Recent (100ms or more) peak of querybuf size. */
int argc; /* Num of arguments of current command. */
robj **argv; /* Arguments of current command. */
struct redisCommand *cmd, *lastcmd; /* Last command executed. */
int reqtype; /* Request protocol type: PROTO_REQ_* */
int multibulklen; /* Number of multi bulk arguments left to read. */
long bulklen; /* Length of bulk argument in multi bulk request. */
list *reply; /* List of reply objects to send to the client. */
unsigned long long reply_bytes; /* Tot bytes of objects in reply list. */
size_t sentlen; /* Amount of bytes already sent in the current
buffer or object being sent. */
time_t ctime; /* Client creation time. */
time_t lastinteraction; /* Time of the last interaction, used for timeout */
time_t obuf_soft_limit_reached_time;
int flags; /* Client flags: CLIENT_* macros. */
int authenticated; /* When requirepass is non-NULL. */
int replstate; /* Replication state if this is a slave. */
int repl_put_online_on_ack; /* Install slave write handler on ACK. */
int repldbfd; /* Replication DB file descriptor. */
off_t repldboff; /* Replication DB file offset. */
off_t repldbsize; /* Replication DB file size. */
sds replpreamble; /* Replication DB preamble. */
long long reploff; /* Replication offset if this is our master. */
long long repl_ack_off; /* Replication ack offset, if this is a slave. */
long long repl_ack_time;/* Replication ack time, if this is a slave. */
long long psync_initial_offset; /* FULLRESYNC reply offset other slaves
copying this slave output buffer
should use. */
char replrunid[CONFIG_RUN_ID_SIZE+1]; /* Master run id if is a master. */
int slave_listening_port; /* As configured with: REPLCONF listening-port */
char slave_ip[NET_IP_STR_LEN]; /* Optionally given by REPLCONF ip-address */
int slave_capa; /* Slave capabilities: SLAVE_CAPA_* bitwise OR. */
multiState mstate; /* MULTI/EXEC state */
int btype; /* Type of blocking op if CLIENT_BLOCKED. */
blockingState bpop; /* blocking state */
long long woff; /* Last write global replication offset. */
list *watched_keys; /* Keys WATCHED for MULTI/EXEC CAS */
dict *pubsub_channels; /* channels a client is interested in (SUBSCRIBE) */
list *pubsub_patterns; /* patterns a client is interested in (SUBSCRIBE) */
sds peerid; /* Cached peer ID. */
/* Response buffer */
int bufpos;
char buf[PROTO_REPLY_CHUNK_BYTES];
} client;
- querybuf 读缓存,服务读取客户端发送的数据然后写入 client.querybuf 缓存
- fd Client socket
- bufpos buf 回复缓存数据位置,记录 buf 的数据长度
- argv 当前命令参数数组。参考 argc 解析。例如命令:set key1 v1,argc 命令参数个数是 3,argv 字符串数组分别为 ["set","key1","v1"]
超时管理
网络应用面对最基本的两个问题
- 排队
- 超时
redis处理client超时比较简单:
now - c->lastinteraction > server.maxidletime
其中:
lastinteraction 每次readQueryFromClient更新为系统时间
- 排除slaves masters BLPOP Pub/Sub clients
clientsCron -> clientsCronHandleTimeout -> freeClient -> unlinkClient -> aeDeleteFileEvent
void clientsCron(void) {
/* Make sure to process at least numclients/server.hz of clients
* per call. Since this function is called server.hz times per second
* we are sure that in the worst case we process all the clients in 1
* second. */
int numclients = listLength(server.clients);
int iterations = numclients/server.hz;
mstime_t now = mstime();
/* Process at least a few clients while we are at it, even if we need
* to process less than CLIENTS_CRON_MIN_ITERATIONS to meet our contract
* of processing each client once per second. */
if (iterations < CLIENTS_CRON_MIN_ITERATIONS)
iterations = (numclients < CLIENTS_CRON_MIN_ITERATIONS) ?
numclients : CLIENTS_CRON_MIN_ITERATIONS;
while(listLength(server.clients) && iterations--) {
client *c;
listNode *head;
/* Rotate the list, take the current head, process.
* This way if the client must be removed from the list it's the
* first element and we don't incur into O(N) computation. */
listRotate(server.clients);
head = listFirst(server.clients);
c = listNodeValue(head);
/* The following functions do different service checks on the client.
* The protocol is that they return non-zero if the client was
* terminated. */
if (clientsCronHandleTimeout(c,now)) continue;
if (clientsCronResizeQueryBuffer(c)) continue;
}
}
为了在每秒钟内处理所有客户端连接一次,每次调用必须处理numclients/server.hz个客户端,因为clientsCron函数每秒钟调用server.hz次。当客户端数量非常多的时候,该部分的耗时将会非常多,比如1W个客户端连接,在默认值server.hz=10的情况下,每次需要处理1000个客户端
由于redis单线程模型,会导致其他请求被blocked,延时增加
在redis5.0中增加了dynamic-hz参数,默认开启动态hz,使得在客户端连接非常多时,自适应调整hz参数,以控制处理的clients连接数
/* Check for timeouts. Returns non-zero if the client was terminated.
* The function gets the current time in milliseconds as argument since
* it gets called multiple times in a loop, so calling gettimeofday() for
* each iteration would be costly without any actual gain. */
int clientsCronHandleTimeout(client *c, mstime_t now_ms) {
time_t now = now_ms/1000;
if (server.maxidletime &&
!(c->flags & CLIENT_SLAVE) && /* no timeout for slaves */
!(c->flags & CLIENT_MASTER) && /* no timeout for masters */
!(c->flags & CLIENT_BLOCKED) && /* no timeout for BLPOP */
!(c->flags & CLIENT_PUBSUB) && /* no timeout for Pub/Sub clients */
(now - c->lastinteraction > server.maxidletime))
{
serverLog(LL_VERBOSE,"Closing idle client");
freeClient(c);
return 1;
} else if (c->flags & CLIENT_BLOCKED) {
/* Blocked OPS timeout is handled with milliseconds resolution.
* However note that the actual resolution is limited by
* server.hz. */
if (c->bpop.timeout != 0 && c->bpop.timeout < now_ms) {
/* Handle blocking operation specific timeout. */
replyToBlockedClientTimedOut(c);
unblockClient(c);
} else if (server.cluster_enabled) {
/* Cluster: handle unblock & redirect of clients blocked
* into keys no longer served by this server. */
if (clusterRedirectBlockedClientIfNeeded(c))
unblockClient(c);
}
}
return 0;
}
void freeClient(client *c) {
......
/* Unlink the client: this will close the socket, remove the I/O
* handlers, and remove references of the client from different
* places where active clients may be referenced. */
unlinkClient(c);
......
}
/* Remove the specified client from global lists where the client could
* be referenced, not including the Pub/Sub channels.
* This is used by freeClient() and replicationCacheMaster(). */
void unlinkClient(client *c) {
......
/* Certain operations must be done only if the client has an active socket.
* If the client was already unlinked or if it's a "fake client" the
* fd is already set to -1. */
if (c->fd != -1) {
/* Remove from the list of active clients. */
ln = listSearchKey(server.clients,c);
serverAssert(ln != NULL);
listDelNode(server.clients,ln);
/* Unregister async I/O handlers and close the socket. */
aeDeleteFileEvent(server.el,c->fd,AE_READABLE);
aeDeleteFileEvent(server.el,c->fd,AE_WRITABLE);
close(c->fd);
c->fd = -1;
}
......
}
-
unlinkClient 函数中释放client
-
可以看到在listSearchKey中,redis遍历双端列表server.clients查找到对应的redisClient对象然后调用listDelNode把该redisClient对象从server.clients删除
-
listSearchKey遍历列表为O(n)时间复杂度,当大量短连接操作redis时,频繁的释放客户端会引起redis的CPU升高
-
在redis5.0中为了解决此问题,对此操作进行了优化。在createClient的时候将redisClient的指针地址保留,在freeClient的时候直接删除对应的listNode即可,无需再次遍历server.clients
-
连接限制
redis.conf 默认设置: maxclients 10000
- 设置限制过程
adjustOpenFilesLimit 是限制设置的具体实现。通过 setrlimit 函数(系统调用)设置进程的文件数限制,尽可能设置一个最优的限制数量
注:CONFIG_MIN_RESERVED_FDS 程序正常运行预计需要打开文件的数量(listen,日志等)
/* This function will try to raise the max number of open files accordingly to
* the configured max number of clients. It also reserves a number of file
* descriptors (CONFIG_MIN_RESERVED_FDS) for extra operations of
* persistence, listening sockets, log files and so forth.
*
* If it will not be possible to set the limit accordingly to the configured
* max number of clients, the function will do the reverse setting
* server.maxclients to the value that we can actually handle. */
void adjustOpenFilesLimit(void) {
rlim_t maxfiles = server.maxclients+CONFIG_MIN_RESERVED_FDS;
struct rlimit limit;
if (getrlimit(RLIMIT_NOFILE,&limit) == -1) {
serverLog(LL_WARNING,"Unable to obtain the current NOFILE limit (%s), assuming 1024 and setting the max clients configuration accordingly.",
strerror(errno));
server.maxclients = 1024-CONFIG_MIN_RESERVED_FDS;
} else {
rlim_t oldlimit = limit.rlim_cur;
/* Set the max number of files if the current limit is not enough
* for our needs. */
if (oldlimit < maxfiles) {
rlim_t bestlimit;
int setrlimit_error = 0;
/* Try to set the file limit to match 'maxfiles' or at least
* to the higher value supported less than maxfiles. */
bestlimit = maxfiles;
while(bestlimit > oldlimit) {
rlim_t decr_step = 16;
limit.rlim_cur = bestlimit;
limit.rlim_max = bestlimit;
if (setrlimit(RLIMIT_NOFILE,&limit) != -1) break;
setrlimit_error = errno;
/* We failed to set file limit to 'bestlimit'. Try with a
* smaller limit decrementing by a few FDs per iteration. */
if (bestlimit < decr_step) break;
bestlimit -= decr_step;
}
/* Assume that the limit we get initially is still valid if
* our last try was even lower. */
if (bestlimit < oldlimit) bestlimit = oldlimit;
if (bestlimit < maxfiles) {
int old_maxclients = server.maxclients;
server.maxclients = bestlimit-CONFIG_MIN_RESERVED_FDS;
if (server.maxclients < 1) {
exit(1);
}
} else {
serverLog(LL_NOTICE,"Increased maximum number of open files "
"to %llu (it was originally set to %llu).",
(unsigned long long) maxfiles,
(unsigned long long) oldlimit);
}
}
}
}
- 关闭超量连接
当满足listLength(server.clients) > server.maxclients时,关闭client
static void acceptCommonHandler(int fd, int flags, char *ip) {
client *c;
if ((c = createClient(fd)) == NULL) {
serverLog(LL_WARNING,
"Error registering fd event for the new client: %s (fd=%d)",
strerror(errno),fd);
close(fd); /* May be already closed, just ignore errors */
return;
}
/* If maxclient directive is set and this is one client more... close the
* connection. Note that we create the client instead to check before
* for this condition, since now the socket is already set in non-blocking
* mode and we can send an error for free using the Kernel I/O */
if (listLength(server.clients) > server.maxclients) {
char *err = "-ERR max number of clients reached\r\n";
/* That's a best effort error message, don't check write errors */
if (write(c->fd,err,strlen(err)) == -1) {
/* Nothing to do, Just to avoid the warning... */
}
server.stat_rejected_conn++;
freeClient(c);
return;
}
......
}
- 还有其他的限制吗?
系统文件限制配置
当进程打开文件数量超出限制,系统将会给进程发送信号(例如:SIGSTOP 信号),强制其退出
这里不再进行展开
连接query buffer处理
何时分配
- createClient query_buf默认为0
- readQueryFromClient->sdsMakeRoomFor 1024*32
何时调整buffer大小
- clientsCron->clientsCronResizeQueryBuffer
/* The client query buffer is an sds.c string that can end with a lot of
* free space not used, this function reclaims space if needed.
*
* The function always returns 0 as it never terminates the client. */
int clientsCronResizeQueryBuffer(client *c) {
size_t querybuf_size = sdsAllocSize(c->querybuf);
time_t idletime = server.unixtime - c->lastinteraction;
/* There are two conditions to resize the query buffer:
* 1) Query buffer is > BIG_ARG and too big for latest peak.
* 2) Client is inactive and the buffer is bigger than 1k. */
if (((querybuf_size > PROTO_MBULK_BIG_ARG) &&
(querybuf_size/(c->querybuf_peak+1)) > 2) ||
(querybuf_size > 1024 && idletime > 2))
{
/* Only resize the query buffer if it is actually wasting space. */
if (sdsavail(c->querybuf) > 1024) {
c->querybuf = sdsRemoveFreeSpace(c->querybuf);
}
}
/* Reset the peak again to capture the peak memory usage in the next
* cycle. */
c->querybuf_peak = 0;
return 0;
}
满足以下两个条件之一时,进行resize buffer:
- 查询缓冲区大于32K,且远大于查询缓冲区数据峰值
- 查询缓冲区大于1K,且客户端当前处于非活跃状态