- Published on
Kafka 最佳实践
- Authors
- Name
- Guming
Kafka 最佳实践
下面为PPT的节选和个人的补充:
Kafka 基本配置及性能优化
硬件要求
集群规模 | 内存 | CPU | 存储 | |
---|---|---|---|---|
Kafka Brokers | 3+ | 24GB+(for small)64GB+(for large) | 多核(12cpu+core +),并允许超线程 | 6+ x1TB 的专属磁盘(RAID 或 JBOD |
Zookeeper | 3(for small)5(for large) | 8GB+(for small)24GB+(for large) | 2 core+ | SSD 用于中间的日志传输 |
OS 调优
- OS page cache
- 通过调整/proc/sys/vm/dirty_background_ratio /proc/sys/vm/dirty_ratio来调优性能
- 尽量分配与所有日志的激活日志段大小相同的页缓存大小
- wiki
- fd 限制:100k+;
- 禁用 swapping
- TCP 调优;
- JVM 配置
- JDK 8 并且使用 G1 垃圾收集器;
- 至少要分配 6-8 GB 的堆内存
磁盘存储
- 使用多块磁盘,并配置为 Kafka 专用的磁盘;
- JBOD vs RAID10
- RAID10 成本相对高
- JBOD(Just a Bunch of Disks) 简单说就是普通磁盘
- JBOD限制
- 比如磁盘失败将导致Kafka异常关闭
- 跨磁盘数据不保证一致性
- 多级目录
- 社区也正在解决这么问题,可以关注 KIP 112、113:
- 必要的工具用于管理 JBOD;
- 自动化的分区管理;
- 磁盘损坏时,Broker 可以将 replicas 迁移到好的磁盘上;
- 在同一个 Broker 的磁盘间 reassign replicas;
- RAID 10 的特点:
- 可以允许单磁盘的损坏;
- 性能和保护;
- 不同磁盘间的负载均衡;
- 高命中来减少 space;
- 单一的 mount point;
- SSD
- 昂贵的选项
- 文件系统:
- 使用 EXT 或 XFS;
- Issue on NFS
- SNA NAS
基本监控
- CPU 负载
- 网络带宽
- 文件句柄数
- 磁盘空间
- 磁盘 IO 性能
- JVM GC 信息
- ZooKeeper 监控
Kafka Replication
Replication介绍
- Partition 有两种副本:Leader,Follower;
- Leader 负责维护 in-sync-replicas(ISR)
- replica.lag.time.max.ms:默认为10000,如果 follower 落后于 leader 的消息数超过这个数值时,leader 就将 follower 从 isr 列表中移除;
- min.insync.replica:Producer 端使用来用于保证持久性
Under Replicated Partitions
- JMX指标:kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
- 可能原因
- broker挂了
- controller问题
- zk问题
- 网络问题
- 解决办法
- 调整ISR参数,比如:
- min.insync.replica
- replica.lag.time.max.ms
- num.replica.fetchers 默认为1,用于从 leader 同步数据的 fetcher 线程数
- 增加broker
- 调整ISR参数,比如:
Controller
- 负责管理 partition 生命周期;
- 避免 Controller’s ZK会话超时:
- ISR 抖动;
- ZK Server 性能问题;
- Broker 长时间的 GC;
- 网络 IO 问题;
- 监控:
- kafka.controller:type=KafkaController,name=ActiveControllerCount- jmx
- LeaderElectionRate
Unclean leader 选举
允许不在 isr 中 replica 被选举为 leader。
- 这是 Availability 和 Correctness 之间选择,Kafka 默认选择了可用性;
- unclean.leader.election.enable:默认为 true,即允许不在 isr 中 replica 选为 leader,这个配置可以全局配置,也可以在 topic 级别配置;
- 监控:kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec - JMX
- default下一版本将改变
Broker 配置
基本配置
- log.retention.
{ms, minutes, hours}
, log.retention.bytes - message.max.bytes, replica.fetch.max.bytes
- delete.topic.enable:默认为 false,是否允许通过 admin tool 来删除 topic;
- unclean.leader.election.enable = false
- min.insync.replicas = 2
- 当 Producer 的 acks 设置为 all 或 -1 时,min.insync.replicas 代表了必须进行确认的最小 replica 数,如果不够的话 Producer 将会报 NotEnoughReplicas 或 NotEnoughReplicasAfterAppend 异常
- replica.lag.time.max.ms 超过这个时间没有发送请求的话,follower 将从 isr 中移除)
- num.replica.fetchers = 1 用于从 leader 同步数据的 fetcher 线程数
- replica.fetch.response.max.bytes
- zookeeper.session.timeout.ms = 30s
- num.io.threads:默认为8,KafkaRequestHandlerPool 的大小
Cluster评估
- Broker 评估
- 每个 Broker 的 Partition 数不应该超过2k
- 控制
Partition <= 25GB
;
- 集群评估(Broker 的数量根据以下条件配置)
- 数据保留时间
- 集群的流量大小
- 集群扩容:
- 磁盘使用率小于60%
- 网络使用率小于75%
- 集群监控
- 确保topic分区分布尽量均匀
- 确保broker节点不会出现磁盘、带宽耗尽
Broker 监控
- Partition 数:kafka.server:type=ReplicaManager,name=PartitionCount - JMX
- Leader 副本数:kafka.server:type=ReplicaManager,name=LeaderCount - JMX
- ISR 扩容/缩容率:kafka.server:type=ReplicaManager,name=IsrExpandsPerSec - JMX
- 读写速率:Message in rate/Byte in rate/Byte out rate
- 网络请求的平均空闲率:NetworkProcessorAvgIdlePercent
- 请求处理平均空闲率:RequestHandlerAvgIdlePercent
Topic 评估
- partition 数
- Partition 数应该至少与最大 consumer group 中 consumer 线程数一致
- 对于使用频繁的 topic,应该设置更多的 partition
- 控制 partition小于等于25GB 左右
- 考虑应用未来的增长(可以使用一种机制进行自动扩容)
- 使用带 key 的 topic
- 相同的key 进入相同的partition
- partition 扩容:当 partition 的数据量超过一个阈值时应该自动扩容(实际上还应该考虑网络流量)
分区选择
- 根据TPS的要求设置 partition 数:
- 假设 Producer 单 partition 的吞吐量为 P
- consumer 消费一个 partition 的吞吐量为 C
- 而要求的吞吐量为 T
- 那么 partition 数至少应该大于 T/P、T/C 的最大值
- 更多分区意味着更多的文件句柄、消息处理延时和更多的内存使用
- 一条消息只有其被同步到 isr 的所有 broker 上后,才能被消费,partition 越多,不同节点之间同步就越多,这可能会带来毫秒级甚至数十毫秒级的延迟
- 对于每一个 segment,在 broker 都会有一个对应的 index 和实际数据文件,而对于 Kafka Broker,它将会对于每个 segment 每个 index 和数据文件都会打开相应的 file handle(可以理解为 fd),因此,partition 越多,将会带来更多的 fd
- Producer 和 Consumer 都会按照 partition 去缓存数据,每个 partition 都会带来数十 KB 的消耗,partition 越多, Client 将会占用更多的内存
- 参考
Quotas
- 避免恶意客户端并维护SLA - 安全认证
- 设定字节率阈值限制
- 监控throttle-rate,byte-rate
- replica.fecth.response.max.bytes: 设置follower副本fetch请求response大小
- 限制带宽: kafka-reassign-partitions.sh --throttle options
Kafka Producer
- 使用Java版本producer
- 使用kafka-producer-perf-test.sh测试
- 设置好内存、cpu、batch、压缩等参数
- batch.size: 越大,TPS越大,延时也越大
- linger.ms: 越大,TPS越大,延时也越大
- max.in.flight.requests.per.connection: 增加TPS,影响消息接收顺序
- compression.type: 设置压缩类型,提升TPS
- acks: 设置消息持久性级别
- 避免发送大消息(会使用更多内存,降低broker处理)
性能调优
- 如果吞吐量小于网络带宽
- 增加线程;
- 提高 batch.size;
- 增加更多 producer 实例;
- 增加 partition 数;
- 设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解
- 跨数据中心的传输:增加 socket 缓冲区设置以及 OS tcp 缓冲区设置
监控指标
- batch-size-avg
- compression-rate-avg
- waiting-threads
- buffer-available-bytes
- record-queue-time-max
- record-send-rate
- records-per-request-avg
Kafka Consumer
- 使用 kafka-consumer-perf-test.sh 测试环境;
- 吞吐量问题:
- partition 数不够
- OS缓存命中太低,分配更多页缓存
- 应用的处理逻辑
- offset topic
- offsets.topic.replication.factor:默认为3;
- offsets.retention.minutes:默认为1440,即 1day;
- MonitorISR,topicsize;
- offset commit慢的问题:异步 commit 或 手动 commit
基本配置
- fetch.min.bytes 、fetch.max.wait.ms fetch最小bytes和最大等待时间
- max.poll.interval.ms:调用 poll() 之后延迟的最大时间,超过这个时间没有调用 poll() 的话,就会认为这个 consumer 挂掉了,将会进行 rebalance
- max.poll.records:当调用 poll() 之后返回最大的 record 数,默认为500;
- session.timeout.ms;
- Consumer Rebalance
- check timeouts
- check processing times/logic
- GC Issues
- 网络配置
监控
监控消费速度
- Consumer Lag:consumer offset 与 the end of log(partition 可以消费的最大 offset) 的差值
- 监控
- metric 监控:records-lag-max;
- 通过 bin/kafka-consumer-groups.sh 查看
- 用于 consumer 监控的 LinkedIn’s Burrow
- 减少 Lag
- 分析 consumer:是 GC 问题还是 Consumer hang 住了
- 增加 Consumer 的线程
- 增加分区数和 consumer 线程
- 提高业务处理速度
如何保证数据不丢?
- producer
- retries = MAX
- acks=all
- max.in.flight.requests.per.connection = 1
- broker
- replication factor >= 3
- min.insync.replicas = 2
- 关闭unclean leader选举
- consumer
- auto.offset.commit = false
- 消息被处理后提交offset