Redis源码 - Scale机制 Cluster part1
Redis Cluster设计目标
-
High performance and linear scalability ,最多可达 1000 个节点。该系统没有代理,使用异步复制,且不对值执行合并操作
-
write safety:系统试图(尽力而为地)保留所有来自连接了大多数主节点的客户端的写入。通常情况下,确认写入可能丢失的时间窗口很小。当客户端处于少数分区时,丢失确认写入的时间窗口会更大
-
availability :Redis Cluster 能够在大多数主节点可达的分区中存活,并且对于每个不再可达的主节点,至少有一个可达的副本。
- 通过replicas migration实现
其他功能:
- cluster划分若干slots,key存储再slot对应的node上
- 实现单机版 Redis 中所有单键命令
- 对于执行复杂的多键操作(如集合并和交集)的命令,只有当操作中涉及的所有键都散列到同一个slot才支持
- 实现hash-tag,用于强制将某些键存储在同一个哈希槽中
- cluster 不支持像独立版本 Redis 那样的多个数据库。我们仅支持数据库 0,禁止使用select
核心组件Overview
通信机制
在 Redis Cluster 中,节点负责保存数据并维护集群的状态,包括将键映射到正确的节点。集群节点还能够自动发现其他节点,检测非工作节点,并在需要时将副本节点提升为主节点,以便在发生故障时继续运行
所有集群节点都使用 TCP 总线和二进制协议连接,称为 Redis Cluster Bus。每个节点都通过集群总线连接到集群中的其他节点。节点使用一种Gossip协议来传播有关集群的信息,以发现新节点,发送 ping 数据包以确保所有其他节点正常工作,并发送集群消息以信号特定条件。集群总线还用于在集群中传播发布/订阅消息,并在用户请求时协调手动故障转移(手动故障转移是由系统管理员直接发起的故障转移,而不是由 Redis Cluster 故障检测器发起的)
通信消息
-
redis 定义了一个结构体 clusterMsg,它用来表示节点间通信的一条消息。它包含的信息包括发送消息节点的名称、IP、集群通信端口和负责的 slots,以及消息类型、消息长度和具体的消息体
-
重点是clusterMsgDataGossip结构
- 记录ping_sent 节点发送Ping的时间
- 记录pong_received 节点收到Pong的时间
/* Cluster node flags and macros. */
#define CLUSTER_NODE_MASTER 1 /* The node is a master */
#define CLUSTER_NODE_SLAVE 2 /* The node is a slave */
#define CLUSTER_NODE_PFAIL 4 /* Failure? Need acknowledge */
#define CLUSTER_NODE_FAIL 8 /* The node is believed to be malfunctioning */
#define CLUSTER_NODE_MYSELF 16 /* This node is myself */
#define CLUSTER_NODE_HANDSHAKE 32 /* We have still to exchange the first ping */
#define CLUSTER_NODE_NOADDR 64 /* We don't know the address of this node */
#define CLUSTER_NODE_MEET 128 /* Send a MEET message to this node */
/* clusterLink encapsulates everything needed to talk with a remote node. */
typedef struct clusterLink {
mstime_t ctime; /* Link creation time */
int fd; /* TCP socket file descriptor */
sds sndbuf; /* Packet send buffer */
sds rcvbuf; /* Packet reception buffer */
struct clusterNode *node; /* Node related to this link if any, or NULL */
} clusterLink;
typedef struct {
char sig[4]; /* Signature "RCmb" (Redis Cluster message bus). */
uint32_t totlen; /* Total length of this message */
uint16_t ver; /* Protocol version, currently set to 1. */
uint16_t port; /* TCP base port number. */
uint16_t type; /* Message type */
uint16_t count; /* Only used for some kind of messages. */
uint64_t currentEpoch; /* The epoch accordingly to the sending node. */
uint64_t configEpoch; /* The config epoch if it's a master, or the last
epoch advertised by its master if it is a
slave. */
uint64_t offset; /* Master replication offset if node is a master or
processed replication offset if node is a slave. */
char sender[CLUSTER_NAMELEN]; /* Name of the sender node */
unsigned char myslots[CLUSTER_SLOTS/8];
char slaveof[CLUSTER_NAMELEN];
char myip[NET_IP_STR_LEN]; /* Sender IP, if not all zeroed. */
char notused1[34]; /* 34 bytes reserved for future usage. */
uint16_t cport; /* Sender TCP cluster bus port */
uint16_t flags; /* Sender node flags */
unsigned char state; /* Cluster state from the POV of the sender */
unsigned char mflags[3]; /* Message flags: CLUSTERMSG_FLAG[012]_... */
union clusterMsgData data;
} clusterMsg;
union clusterMsgData {
/* PING, MEET and PONG */
struct {
/* Array of N clusterMsgDataGossip structures */
clusterMsgDataGossip gossip[1];
} ping;
/* FAIL */
struct {
clusterMsgDataFail about;
} fail;
/* PUBLISH */
struct {
clusterMsgDataPublish msg;
} publish;
/* UPDATE */
struct {
clusterMsgDataUpdate nodecfg;
} update;
/* MODULE */
struct {
clusterMsgModule msg;
} module;
};
/* Initially we don't know our "name", but we'll find it once we connect
* to the first node, using the getsockname() function. Then we'll use this
* address for all the next messages. */
typedef struct {
char nodename[CLUSTER_NAMELEN];
uint32_t ping_sent;
uint32_t pong_received;
char ip[NET_IP_STR_LEN]; /* IP address last time it was seen */
uint16_t port; /* base port last time it was seen */
uint16_t cport; /* cluster port last time it was seen */
uint16_t flags; /* node->flags copy */
uint32_t notused1;
} clusterMsgDataGossip;
/* Message types.
*
* Note that the PING, PONG and MEET messages are actually the same exact
* kind of packet. PONG is the reply to ping, in the exact format as a PING,
* while MEET is a special PING that forces the receiver to add the sender
* as a node (if it is not already in the list). */
#define CLUSTERMSG_TYPE_PING 0 /* Ping */
#define CLUSTERMSG_TYPE_PONG 1 /* Pong (reply to Ping) */
#define CLUSTERMSG_TYPE_MEET 2 /* Meet "let's join" message */
#define CLUSTERMSG_TYPE_FAIL 3 /* Mark node xxx as failing */
#define CLUSTERMSG_TYPE_PUBLISH 4 /* Pub/Sub Publish propagation */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_REQUEST 5 /* May I failover? */
#define CLUSTERMSG_TYPE_FAILOVER_AUTH_ACK 6 /* Yes, you have my vote */
#define CLUSTERMSG_TYPE_UPDATE 7 /* Another node slots configuration */
#define CLUSTERMSG_TYPE_MFSTART 8 /* Pause clients for manual failover */
#define CLUSTERMSG_TYPE_MODULE 9 /* Module cluster API message. */
#define CLUSTERMSG_TYPE_COUNT 10 /* Total number of message types. */
Node Handshake
-
cluster启动触发
- 创建TCP监听连接initServer->clusterInit,创建clusterLink对象用于监听来自其他node的通信
- clusterAcceptHandler 完成接收连接的工作
-
CLUSTER MEET 加入节点,处理命令后会更新本地server.cluster->nodes,然后处理连接
-
一个节点只会以两种方式将另一个节点视为集群的一部分
- 一个节点通过 MEET 消息MEET 消息与 PING 消息完全相同,但会强制当前节点接受发送节点作为集群的一部分
- 如果已经信任的节点向其他节点传递关于该节点的信息。例如,如果节点 A 知道节点 B,而节点 B 知道节点 C,那么最终节点 B 将向节点 A 传递关于节点 C 的gossip消息。当这种情况发生时,节点 A 将注册节点 C 为网络的一部分,并尝试连接节点 C
-
实现: clusterCron 中将加入集群且未创建连接的node - server.cluster->nodes,建立tcp连接
-
//clusterInit函数
if (listenToPort(server.port+CLUSTER_PORT_INCR,
server.cfd,&server.cfd_count) == C_ERR)
{
exit(1);
} else {
int j;
for (j = 0; j < server.cfd_count; j++) {
if (aeCreateFileEvent(server.el, server.cfd[j], AE_READABLE,
clusterAcceptHandler, NULL) == AE_ERR)
serverPanic("Unrecoverable error creating Redis Cluster "
"file event.");
}
}
//clusterCron函数
if (node->link == NULL) {
int fd;
mstime_t old_ping_sent;
clusterLink *link;
fd = anetTcpNonBlockBindConnect(server.neterr, node->ip,
node->cport, NET_FIRST_BIND_ADDR);
if (fd == -1) {
/* We got a synchronous error from connect before
* clusterSendPing() had a chance to be called.
* If node->ping_sent is zero, failure detection can't work,
* so we claim we actually sent a ping now (that will
* be really sent as soon as the link is obtained). */
if (node->ping_sent == 0) node->ping_sent = mstime();
serverLog(LL_DEBUG, "Unable to connect to "
"Cluster Node [%s]:%d -> %s", node->ip,
node->cport, server.neterr);
continue;
}
link = createClusterLink(node);
link->fd = fd;
node->link = link;
aeCreateFileEvent(server.el,link->fd,AE_READABLE,
clusterReadHandler,link);
......
}
Gossip Protocol
-
按一定频率随机选一些节点进行通信 serverCron->clusterCron 100ms执行一次
-
每执行10次clusterCron函数,执行1次该分支代码,即每秒发送一次ping
-
随机选5个节点,然后从中选择最久没有与之通信的节点,发送ping命令 - clusterSendPing
- freshnodes = dictSize(server.cluster->nodes)-2 等于集群节点数减 2
- wanted 默认值是集群节点数的 1/10,但是如果这个默认值小于 3,那么 wanted 就等于 3。如果这个默认值大于 freshnodes,那么 wanted 就等于 freshnodes 的大小
- maxiterations = wanted*3
-
收到ping的节点使用回调函数clusterReadHandler进行处理
-
clusterReadHandler -> clusterProcessPacket
-
最终调用clusterSendPing 发送回复PONG
-
收到Pong 消息的节点,就可以根据消息体中的信息,更新本地记录的对应节点的信息了 - clusterProcessPacket函数处理
- 当收到Pong消息时,更新本地记录的目标节点Pong消息最新返回时间
- 如果发送消息的节点是主节点,更新本地记录的slots分布信息 - 调用clusterProcessGossipSection函数处理Ping或Pong消息的消息体
-
/* Ping some random node 1 time every 10 iterations, so that we usually ping
* one random node every second. */
if (!(iteration % 10)) {
int j;
/* Check a few random nodes and ping the one with the oldest
* pong_received time. */
for (j = 0; j < 5; j++) {
de = dictGetRandomKey(server.cluster->nodes);
clusterNode *this = dictGetVal(de);
/* Don't ping nodes disconnected or with a ping currently active. */
if (this->link == NULL || this->ping_sent != 0) continue;
if (this->flags & (CLUSTER_NODE_MYSELF|CLUSTER_NODE_HANDSHAKE))
continue;
if (min_pong_node == NULL || min_pong > this->pong_received) {
min_pong_node = this;
min_pong = this->pong_received;
}
}
if (min_pong_node) {
serverLog(LL_DEBUG,"Pinging node %.40s", min_pong_node->name);
clusterSendPing(min_pong_node->link, CLUSTERMSG_TYPE_PING);
}
}
Cluster topology
Key 划分模式
- 集群的键空间被分成16384个槽位
- crc16计算key hash
- HASH_SLOT = CRC16(key) mod 16384
/* -----------------------------------------------------------------------------
* Key space handling
* -------------------------------------------------------------------------- */
/* We have 16384 hash slots. The hash slot of a given key is obtained
* as the least significant 14 bits of the crc16 of the key.
*
* However if the key contains the {...} pattern, only the part between
* { and } is hashed. This may be useful in the future to force certain
* keys to be in the same node (assuming no resharding is in progress). */
unsigned int keyHashSlot(char *key, int keylen) {
int s, e; /* start-end indexes of { and } */
for (s = 0; s < keylen; s++)
if (key[s] == '{') break;
/* No '{' ? Hash the whole key. This is the base case. */
if (s == keylen) return crc16(key,keylen) & 0x3FFF;
/* '{' found? Check if we have the corresponding '}'. */
for (e = s+1; e < keylen; e++)
if (key[e] == '}') break;
/* No '}' or nothing between {} ? Hash the whole key. */
if (e == keylen || e == s+1) return crc16(key,keylen) & 0x3FFF;
/* If we are here there is both a { and a } on its right. Hash
* what is in the middle between { and }. */
return crc16(key+s+1,e-s-1) & 0x3FFF;
}
uint16_t crc16(const char *buf, int len) {
int counter;
uint16_t crc = 0;
for (counter = 0; counter < len; counter++)
crc = (crc<<8) ^ crc16tab[((crc>>8) ^ *buf++)&0x00FF];
return crc;
}
命令处理
normal command
redirection command
由于负载均衡或是节点故障而导致数据迁移时,请求重定向是不可避免的
- MOVED Redirection - 负载均衡
clientsCron-> clientsCronHandleTimeout-> 定时处理被blocked clients clusterRedirectBlockedClientIfNeeded-> clusterRedirectClient函数处理
根据error_code == moved处理重定向
void clusterRedirectClient(client *c, clusterNode *n, int hashslot, int error_code) {
......
else if (error_code == CLUSTER_REDIR_MOVED ||
error_code == CLUSTER_REDIR_ASK)
{
addReplySds(c,sdscatprintf(sdsempty(),
"-%s %d %s:%d\r\n",
(error_code == CLUSTER_REDIR_ASK) ? "ASK" : "MOVED",
hashslot,n->ip,n->port));
} else {
serverPanic("getNodeByQuery() unknown error.");
}
......
}
一个 Redis 客户端可以向集群中的每个节点发送查询,包括副本节点。节点会分析查询,并且如果查询是可接受的(也就是说,查询只涉及一个键,或者涉及的多个键都属于同一个哈希槽),它将查找负责所涉及键的哈希槽的节点
如果slot由该节点服务,则查询将被简单处理,否则节点将检查其内部的哈希槽到节点映射,并向客户端回复一个 MOVED 错误:
GET x
-MOVED 3999 127.0.0.1:6381
3999 表示 slot位置
client需要重新向127.0.0.1:6381请求
在重定向之前,client被blocked 直至收到err重定向
也就是说,频繁更换slot将影响client操作延时,尽可能保持slot稳定,减少moved ,可以减少客户端延时
- ASKED Redirection
- Live reconfiguration
Redis Cluster 支持在集群运行时添加和移除节点的能力。添加或移除节点被抽象为相同的操作:将哈希槽从一个节点移动到另一个节点。这意味着可以使用相同的基本机制来重新平衡集群、添加或移除节点等
要重新平衡集群,需要在节点之间移动一组给定的slots
实现的核心是移动哈希槽的能力。从实际角度来看,哈希槽只是一组键。
因此 Redis Cluster 在重新分片期间实际上是将键从一个实例移动到另一个实例。移动一个slot意味着移动所有哈希到该slot的键