聊聊Kafka 如何保证消息不丢

如何保证消息不丢?

Broker 视角

前提:暂不考虑异步flush磁盘的问题,这是另外一个话题

基本原则:冗余 + 容错

复制机制和分区的多副本架构是Kafka 可靠性保证的核心

  1. 多副本:replication.factor >= 3

  2. ISR:min.insync.replicas = 2 至少

  3. Leader选举: unclean.leader.election.enable=false

    • 如果leader发生故障或不可用,快速选举新的leader以确保集群可用
    • 禁用unclean选举,保障只有同步的副本可以被重新选主
    • Epoch Number 单调递增的数字,用于确保只有最新的broker才能成为分区的leader。 在leader选举期间,具有最高Epoch Number将成为新的leader。

下面分别详细分析上面三点:

  • replication.factor

    • Kafka中的分区副本包括两种类型:Leader Replica 和Follower Replica,每个分区在创建时都要选举一个副本作为领导者副本,其余的副本自动变为追随者副本
    • Kafka 中,追随者副本是不对外提供服务的,所有的读写请求都必须发往leader Replica所在的 Broker,由该 Broker 负责处理。Follower Replica不处理客户端请求,它唯一的任务就是从Leader Replica异步拉取消息,并写入到自己的提交日志中,从而实现与leader的同步
    • 如果副本因子为N,那么在N-1个broker 失效的情况下,仍然能够从主题读取数据或向主题写入数据。所以,更高的副本因子会带来更高的可用性、可靠性和更少的故障。另一方面,副本因子N需要至少N个broker ,而且会有N个数据副本,也就是说它们会占用N倍的磁盘空间。实际生产环境中一般会在可用性和存储硬件之间作出权衡
    • 默认情况下,Kafka会确保分区的每个副本分布在不同的Broker上,但是如果这些Broker在同一个机架上,一旦机架的交换机发生故障,分区就会不可用。所以建议把Broker分布在不同的机架上,可以使用broker.rack参数配置Broker所在机架的名称
  • In-sync replica

    • ISR中的副本都是与Leader Replica 进行同步的副本,所以不在该列表的follower会被认为与Leader是不同步的
    • 首先可以明确的是:Leader Replica总是存在于ISR中。而follower Replica是否在ISR中,取决于该follower Replica 是否与Leader Replica保持了“同步”
    • replica.lag.time.max.ms, 该参数表示follower Replica滞后与Leader Replica的最长时间间隔,默认是10秒
  • unclean.leader.election.enable = false

    • 选择一个同步副本列表中的分区作为leader 分区的过程称为clean leader election

    • 禁止 Unclean Leader 选举,非同步的replica将不再选举范围内,从而保障一定的一致性

    • 重新选举就一定能保障数据不丢吗?

      • 同步副本并不是完全同步的。由于复制是异步完成的,因此无法保证follower可以获取最新消息。比如Leader分区的最后一条消息的offset是100,此时副本的offset可能不是100

      • 可以继续调节:

        • zookeeper.session.timeout.ms:与zookeeper会话超时时间
        • replica.lag.time.max.ms 同步副本滞后与leader副本的时间
      • 没有完美的方案。要么选择AP要么选择CP

Producer 视角

  • 失败重试

    • retries=Long.MAX_VALUE
    • 当出现网络的瞬时抖动时,消息发送可能会失败,此时配置了 retries > 0 的 Producer 能够自动重试消息发送,避免消息丢失
  • acks = all

    • 表明所有副本 Broker 都要接收到消息,该消息才算是“已提交”。这是最高等级的“已提交”定义
  • 如果Producer crash了呢?

    • 这个是应用范畴了,非kafka所支持

      • 本地消息表,当Producer 重启后,读取消息表继续发送
      • local log,模拟WAL,使用BerkeleyDB or levelDB,设立检查点,检查点之后的数据重发,这里面会引入新的问题,重复数据,使用幂等性来解决

Consumer 视角

  • 禁用自动提交:enable.auto.commit=false

  • 消费者处理完消息之后再提交offset

  • 消息交给业务处理出现异常该怎么办?

    • 同样是非kafka范畴的问题
    • A方案:建立dead letter queue,等待业务恢复处理
    • B方案:同Producer crash的策略
    • C方案:利用consumer buffer,个别消息处理失败的情况,设置consumer.pause,让其他的consumer不会再同步到消息,利用buffer继续处理

Kafka可以保障永久不丢失数据吗?

Kafka 只对“已提交”的消息(committed message)做有限度的持久化保证

  • 什么是已提交的消息?

    • 当 Kafka 的若干个 Broker 成功地接收到一条消息并写入到日志文件后,它们会告诉生产者程序这条消息已成功提交
  • 有限度的持久化保证?

    • 消息保存在 N 个 Kafka Broker 上,那么这个前提条件就是这 N 个 Broker 中至少有 1 个存活。只要这个条件成立,Kafka 就能保证你的这条消息永远不会丢失
    • 如果没有broker存活,则不保障消息不丢

微服务里,数据先保存到数据库,再发消息到Kafka 如何保障可靠?

  • 两个操作想要完全做到可靠,只能依靠分布式事务/分布式锁

  • 有没有更为经济的办法?

    • 补偿+幂等操作:见Producer crash,简单来说,就是最终一致性

    • outbox模式:

      • 数据仅保存到数据库,RDS作为outbox,监听DB数据变化如binlog,来push消息到Kafka - 需要其他服务来支撑
      • 数据仅发送到kafka,MQ作为outbox,消费端拉取消息保存至DB