Kafka - 幂等性Producer
Producer幂等性
背景
kafka 提供的at least once 语义,意味着发送的消息可能会传递一次或多次。日常处理中,人们真正想要的是“exactly once”语义,从而不会传递重复的消息
出现重复消息的常见原因有两个:(这里前提是,producer仅传输一次的情况下broker不会重复存储)
- 如果客户端尝试向集群发送消息并收到网络错误,重试可能会导致重复
- 如果消费者从主题读取消息然后崩溃,那么当消费者重新启动或另一个实例接管消费时,新消费者将从原始消费者的最后一个已知位置开始
第二个问题kafka通过offset commit来保障,第一个问题需要特殊处理
Kafka Producer 如何解决第一个问题
kafka通过在 at least once 的基础上加上幂等性来做到 exactly once
当然这是有约束的 exactly once,比如kafka要求单一会话内有效或者跨会话使用事务性有效等
这里我们先分析最简单的情况,在单一会话内如何做到幂等性
要做到幂等性,需要解决以下问题: 1. server 需要有能力识别一条数据到底是不是重复的?常用的手段是通过 唯一键/唯一 ID 来判断 2. 唯一键应该选择什么粒度?对于分布式存储系统来说,全局唯一要求太高,需要增加单独的功能。在可靠性的要求下,还需要分布式唯一ID的功能H/A,引入了复杂度。kafka使用的粒度是,topic partition 方式做区隔。重复数据的判断让 partition 的 leader 去判断处理 3. 分区粒度实现唯一键会不会有其他问题?这里需要考虑的问题是当一个Partition有来自多个 client 写入的情况,这些client之间是很难做到使用同一个唯一键,所以还需要对client进行标识,这就是PID。同一个 client 在处理多个 Topic-Partition 时是使用同一个 PID。 4. 如何区别每一条消息呢?sequence numbers,client 发送的每条消息都会带相应的 sequence number,broker端就是根据这个值来判断数据是否重复
kafka Producer 幂等性的实现原理:
PID(Producer ID)+ Topic-Partition + sequence numbers 还有前提:
- acks = all
- ENABLE_IDEMPOTENCE_CONFIG = true
kafka 如何实现的?
-
Producer PID 申请
- 每个 Producer 在初始化时都会被分配一个唯一的 PID。对于一个给定的 PID,sequence number 将会从0开始自增,每个 Topic-Partition 都会有一个独立的 sequence number。Producer在发送数据时,将会给每条 msg 标识一个 sequence number,Server 也就是通过这个来验证数据是否重复
- PID 是全局唯一的,Producer故障后重新启动后会被分配一个新的PID,这也是幂等性无法做到跨会话的一个原因
-
Server PID 管理
- 向 ZooKeeper 申请,zk中有一个 /latest_producer_id_block 节点(临时借点?请看下面的思考),每个 Broker向 zk 申请一个 PID 段
- 申请 PID 段的好处是减少与zk的交互
-
Sequence Numbers 生成
- 略。原子计数器即可
思考:
-
server Pid 的过期时间,最简单的方法是与connection关联,失去连接即pid过期.但现实是在connection端开后,server仍然需要处理pid对应的信息
- 需要进行lease管理,pid需要保留一个周期
-
如果broker发生故障,producer幂等性还能保证吗?
- 接管的broker leader 会读取PID meta信息继续服务
-
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION 与 Producer幂等性
-
server端会缓存每个 PID 在每个 Topic-Partition 上发送的最近 5 个batch 数据
-
producer幂等性 条件下,MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION需要<=5
- 大于5时,遇到失败重试,server无法准确判断重复(毕竟只cache 5个batch)
-
MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION >1 时可以保证order吗?
- producer幂等下是可行的。Sequence Number决定顺序
-
-
kafka开启了幂等就万无一失了吗?
-
producer故障重启情况,仍然是同一Node但新的PID的retry
- 本质上仍然是跨PID的重复,kafka无法保证幂等性
-
业务应用将同一消息重复调用producer.send,如分布在不同的PID client
- 这种情况kafka无法保证幂等性
- 需要业务应用做幂等处理
-