Kafka - 幂等性Producer

Producer幂等性

原文

背景

kafka 提供的at least once 语义,意味着发送的消息可能会传递一次或多次。日常处理中,人们真正想要的是“exactly once”语义,从而不会传递重复的消息

出现重复消息的常见原因有两个:(这里前提是,producer仅传输一次的情况下broker不会重复存储)

  1. 如果客户端尝试向集群发送消息并收到网络错误,重试可能会导致重复
  2. 如果消费者从主题读取消息然后崩溃,那么当消费者重新启动或另一个实例接管消费时,新消费者将从原始消费者的最后一个已知位置开始

第二个问题kafka通过offset commit来保障,第一个问题需要特殊处理

Kafka Producer 如何解决第一个问题

kafka通过在 at least once 的基础上加上幂等性来做到 exactly once

当然这是有约束的 exactly once,比如kafka要求单一会话内有效或者跨会话使用事务性有效等

这里我们先分析最简单的情况,在单一会话内如何做到幂等性

要做到幂等性,需要解决以下问题: 1. server 需要有能力识别一条数据到底是不是重复的?常用的手段是通过 唯一键/唯一 ID 来判断 2. 唯一键应该选择什么粒度?对于分布式存储系统来说,全局唯一要求太高,需要增加单独的功能。在可靠性的要求下,还需要分布式唯一ID的功能H/A,引入了复杂度。kafka使用的粒度是,topic partition 方式做区隔。重复数据的判断让 partition 的 leader 去判断处理 3. 分区粒度实现唯一键会不会有其他问题?这里需要考虑的问题是当一个Partition有来自多个 client 写入的情况,这些client之间是很难做到使用同一个唯一键,所以还需要对client进行标识,这就是PID。同一个 client 在处理多个 Topic-Partition 时是使用同一个 PID。 4. 如何区别每一条消息呢?sequence numbers,client 发送的每条消息都会带相应的 sequence number,broker端就是根据这个值来判断数据是否重复

kafka Producer 幂等性的实现原理:

PID(Producer ID)+ Topic-Partition + sequence numbers 还有前提:

  • acks = all
  • ENABLE_IDEMPOTENCE_CONFIG = true

kafka 如何实现的?

  1. Producer PID 申请

    • 每个 Producer 在初始化时都会被分配一个唯一的 PID。对于一个给定的 PID,sequence number 将会从0开始自增,每个 Topic-Partition 都会有一个独立的 sequence number。Producer在发送数据时,将会给每条 msg 标识一个 sequence number,Server 也就是通过这个来验证数据是否重复
    • PID 是全局唯一的,Producer故障后重新启动后会被分配一个新的PID,这也是幂等性无法做到跨会话的一个原因
  2. Server PID 管理

    • 向 ZooKeeper 申请,zk中有一个 /latest_producer_id_block 节点(临时借点?请看下面的思考),每个 Broker向 zk 申请一个 PID 段
    • 申请 PID 段的好处是减少与zk的交互
  3. Sequence Numbers 生成

    • 略。原子计数器即可

思考:

  1. server Pid 的过期时间,最简单的方法是与connection关联,失去连接即pid过期.但现实是在connection端开后,server仍然需要处理pid对应的信息

    • 需要进行lease管理,pid需要保留一个周期
  2. 如果broker发生故障,producer幂等性还能保证吗?

    • 接管的broker leader 会读取PID meta信息继续服务
  3. MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 与 Producer幂等性

    • server端会缓存每个 PID 在每个 Topic-Partition 上发送的最近 5 个batch 数据

    • producer幂等性 条件下,MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION需要<=5

      • 大于5时,遇到失败重试,server无法准确判断重复(毕竟只cache 5个batch)
    • MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION >1 时可以保证order吗?

    • producer幂等下是可行的。Sequence Number决定顺序
  4. kafka开启了幂等就万无一失了吗?

    • producer故障重启情况,仍然是同一Node但新的PID的retry

      • 本质上仍然是跨PID的重复,kafka无法保证幂等性
    • 业务应用将同一消息重复调用producer.send,如分布在不同的PID client

      • 这种情况kafka无法保证幂等性
      • 需要业务应用做幂等处理