聊聊Kafka 如何保证消息不丢
如何保证消息不丢?
Broker 视角
前提:暂不考虑异步flush磁盘的问题,这是另外一个话题
基本原则:冗余 + 容错
复制机制和分区的多副本架构是Kafka 可靠性保证的核心
-
多副本:replication.factor >= 3
-
ISR:min.insync.replicas = 2 至少
-
Leader选举: unclean.leader.election.enable=false
- 如果leader发生故障或不可用,快速选举新的leader以确保集群可用
- 禁用unclean选举,保障只有同步的副本可以被重新选主
- Epoch Number 单调递增的数字,用于确保只有最新的broker才能成为分区的leader。 在leader选举期间,具有最高Epoch Number将成为新的leader。
下面分别详细分析上面三点:
-
replication.factor
- Kafka中的分区副本包括两种类型:Leader Replica 和Follower Replica,每个分区在创建时都要选举一个副本作为领导者副本,其余的副本自动变为追随者副本
- Kafka 中,追随者副本是不对外提供服务的,所有的读写请求都必须发往leader Replica所在的 Broker,由该 Broker 负责处理。Follower Replica不处理客户端请求,它唯一的任务就是从Leader Replica异步拉取消息,并写入到自己的提交日志中,从而实现与leader的同步
- 如果副本因子为N,那么在N-1个broker 失效的情况下,仍然能够从主题读取数据或向主题写入数据。所以,更高的副本因子会带来更高的可用性、可靠性和更少的故障。另一方面,副本因子N需要至少N个broker ,而且会有N个数据副本,也就是说它们会占用N倍的磁盘空间。实际生产环境中一般会在可用性和存储硬件之间作出权衡
- 默认情况下,Kafka会确保分区的每个副本分布在不同的Broker上,但是如果这些Broker在同一个机架上,一旦机架的交换机发生故障,分区就会不可用。所以建议把Broker分布在不同的机架上,可以使用broker.rack参数配置Broker所在机架的名称
-
In-sync replica
- ISR中的副本都是与Leader Replica 进行同步的副本,所以不在该列表的follower会被认为与Leader是不同步的
- 首先可以明确的是:Leader Replica总是存在于ISR中。而follower Replica是否在ISR中,取决于该follower Replica 是否与Leader Replica保持了“同步”
- replica.lag.time.max.ms, 该参数表示follower Replica滞后与Leader Replica的最长时间间隔,默认是10秒
-
unclean.leader.election.enable = false
-
选择一个同步副本列表中的分区作为leader 分区的过程称为clean leader election
-
禁止 Unclean Leader 选举,非同步的replica将不再选举范围内,从而保障一定的一致性
-
重新选举就一定能保障数据不丢吗?
-
同步副本并不是完全同步的。由于复制是异步完成的,因此无法保证follower可以获取最新消息。比如Leader分区的最后一条消息的offset是100,此时副本的offset可能不是100
-
可以继续调节:
- zookeeper.session.timeout.ms:与zookeeper会话超时时间
- replica.lag.time.max.ms 同步副本滞后与leader副本的时间
-
没有完美的方案。要么选择AP要么选择CP
-
-
Producer 视角
-
失败重试
- retries=Long.MAX_VALUE
- 当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失
-
acks = all
- 表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义
-
如果Producer crash了呢?
-
这个是应用范畴了,非kafka所支持
- 本地消息表,当Producer 重启后,读取消息表继续发送
- local log,模拟WAL,使用BerkeleyDB or levelDB,设立检查点,检查点之后的数据重发,这里面会引入新的问题,重复数据,使用幂等性来解决
-
Consumer 视角
-
禁用自动提交:enable.auto.commit=false
-
消费者处理完消息之后再提交offset
-
消息交给业务处理出现异常该怎么办?
- 同样是非kafka范畴的问题
- A方案:建立dead letter queue,等待业务恢复处理
- B方案:同Producer crash的策略
- C方案:利用consumer buffer,个别消息处理失败的情况,设置consumer.pause,让其他的consumer不会再同步到消息,利用buffer继续处理
Kafka可以保障永久不丢失数据吗?
Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证
-
什么是已提交的消息?
- 当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交
-
有限度的持久化保证?
- 消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失
- 如果没有broker存活,则不保障消息不丢
微服务里,数据先保存到数据库,再发消息到Kafka 如何保障可靠?
-
两个操作想要完全做到可靠,只能依靠分布式事务/分布式锁
-
有没有更为经济的办法?
-
补偿+幂等操作:见Producer crash,简单来说,就是最终一致性
-
outbox模式:
- 数据仅保存到数据库,RDS作为outbox,监听DB数据变化如binlog,来push消息到Kafka - 需要其他服务来支撑
- 数据仅发送到kafka,MQ作为outbox,消费端拉取消息保存至DB
-