Kafka 最佳实践
Kafka 最佳实践
下面为PPT的节选和个人的补充:
Kafka 基本配置及性能优化
硬件要求
集群规模 | 内存 | CPU | 存储 | |
---|---|---|---|---|
Kafka Brokers | 3+ | 24GB+(for small)64GB+(for large) | 多核(12cpu+core +),并允许超线程 | 6+ x1TB 的专属磁盘(RAID 或 JBOD |
Zookeeper | 3(for small)5(for large) | 8GB+(for small)24GB+(for large) | 2 core+ | SSD 用于中间的日志传输 |
OS 调优
-
OS page cache
- 通过调整/proc/sys/vm/dirty_background_ratio /proc/sys/vm/dirty_ratio来调优性能
- 尽量分配与所有日志的激活日志段大小相同的页缓存大小
- wiki
-
fd 限制:100k+;
-
禁用 swapping
-
TCP 调优;
-
JVM 配置
- JDK 8 并且使用 G1 垃圾收集器;
- 至少要分配 6-8 GB 的堆内存
磁盘存储
-
使用多块磁盘,并配置为 Kafka 专用的磁盘;
-
JBOD vs RAID10
- RAID10 成本相对高
-
JBOD(Just a Bunch of Disks) 简单说就是普通磁盘
-
JBOD限制
-
比如磁盘失败将导致Kafka异常关闭
-
跨磁盘数据不保证一致性
-
多级目录
-
社区也正在解决这么问题,可以关注 KIP 112、113:
- 必要的工具用于管理 JBOD;
- 自动化的分区管理;
- 磁盘损坏时,Broker 可以将 replicas 迁移到好的磁盘上;
- 在同一个 Broker 的磁盘间 reassign replicas;
-
-
RAID 10 的特点:
- 可以允许单磁盘的损坏;
- 性能和保护;
- 不同磁盘间的负载均衡;
- 高命中来减少 space;
- 单一的 mount point;
-
SSD
- 昂贵的选项
-
文件系统:
- 使用 EXT 或 XFS;
- Issue on NFS
- SNA NAS
基本监控
- CPU 负载
- 网络带宽
- 文件句柄数
- 磁盘空间
- 磁盘 IO 性能
- JVM GC 信息
- ZooKeeper 监控
Kafka Replication
Replication介绍
-
Partition 有两种副本:Leader,Follower;
-
Leader 负责维护 in-sync-replicas(ISR)
- replica.lag.time.max.ms:默认为10000,如果 follower 落后于 leader 的消息数超过这个数值时,leader 就将 follower 从 isr 列表中移除;
- min.insync.replica:Producer 端使用来用于保证持久性
Under Replicated Partitions
-
JMX指标:kafka.server:type=ReplicaManager,name=UnderReplicatedPartitions
-
可能原因
- broker挂了
- controller问题
- zk问题
- 网络问题
-
解决办法
-
调整ISR参数,比如:
- min.insync.replica
- replica.lag.time.max.ms
- num.replica.fetchers 默认为1,用于从 leader 同步数据的 fetcher 线程数
-
增加broker
-
Controller
-
负责管理 partition 生命周期;
-
避免 Controller’s ZK会话超时:
-
ISR 抖动;
- ZK Server 性能问题;
- Broker 长时间的 GC;
- 网络 IO 问题;
-
监控:
- kafka.controller:type=KafkaController,name=ActiveControllerCount- jmx
- LeaderElectionRate
Unclean leader 选举
-
允许不在 isr 中 replica 被选举为 leader。
- 这是 Availability 和 Correctness 之间选择,Kafka 默认选择了可用性;
- unclean.leader.election.enable:默认为 true,即允许不在 isr 中 replica 选为 leader,这个配置可以全局配置,也可以在 topic 级别配置;
- 监控:kafka.controller:type=ControllerStats,name=UncleanLeaderElectionsPerSec - JMX
- default下一版本将改变
Broker 配置
基本配置
-
log.retention.{ms, minutes, hours} , log.retention.bytes
-
message.max.bytes, replica.fetch.max.bytes
-
delete.topic.enable:默认为 false,是否允许通过 admin tool 来删除 topic;
-
unclean.leader.election.enable = false
-
min.insync.replicas = 2
- 当 Producer 的 acks 设置为 all 或 -1 时,min.insync.replicas 代表了必须进行确认的最小 replica 数,如果不够的话 Producer 将会报 NotEnoughReplicas 或 NotEnoughReplicasAfterAppend 异常
-
replica.lag.time.max.ms 超过这个时间没有发送请求的话,follower 将从 isr 中移除)
-
num.replica.fetchers = 1 用于从 leader 同步数据的 fetcher 线程数
-
replica.fetch.response.max.bytes
-
zookeeper.session.timeout.ms = 30s
-
num.io.threads:默认为8,KafkaRequestHandlerPool 的大小
Cluster评估
-
Broker 评估
- 每个 Broker 的 Partition 数不应该超过2k
- 控制partition <=25GB;
-
集群评估(Broker 的数量根据以下条件配置)
- 数据保留时间
- 集群的流量大小
-
集群扩容:
- 磁盘使用率<60%
- 网络使用率<75%
-
集群监控
- 确保topic分区分布尽量均匀
- 确保broker节点不会出现磁盘、带宽耗尽
Broker 监控
- Partition 数:kafka.server:type=ReplicaManager,name=PartitionCount - JMX
- Leader 副本数:kafka.server:type=ReplicaManager,name=LeaderCount - JMX
- ISR 扩容/缩容率:kafka.server:type=ReplicaManager,name=IsrExpandsPerSec - JMX
- 读写速率:Message in rate/Byte in rate/Byte out rate
- 网络请求的平均空闲率:NetworkProcessorAvgIdlePercent
- 请求处理平均空闲率:RequestHandlerAvgIdlePercent
Topic 评估
-
partition 数
- Partition 数应该至少与最大 consumer group 中 consumer 线程数一致
- 对于使用频繁的 topic,应该设置更多的 partition
- 控制 partition<=25GB 左右
- 考虑应用未来的增长(可以使用一种机制进行自动扩容)
-
使用带 key 的 topic
- 相同的key 进入相同的partition
-
partition 扩容:当 partition 的数据量超过一个阈值时应该自动扩容(实际上还应该考虑网络流量)
分区选择
-
根据TPS的要求设置 partition 数:
- 假设 Producer 单 partition 的吞吐量为 P
- consumer 消费一个 partition 的吞吐量为 C
- 而要求的吞吐量为 T
- 那么 partition 数至少应该大于 T/P、T/C 的最大值
-
更多分区意味着更多的文件句柄、消息处理延时和更多的内存使用
- 一条消息只有其被同步到 isr 的所有 broker 上后,才能被消费,partition 越多,不同节点之间同步就越多,这可能会带来毫秒级甚至数十毫秒级的延迟
- 对于每一个 segment,在 broker 都会有一个对应的 index 和实际数据文件,而对于 Kafka Broker,它将会对于每个 segment 每个 index 和数据文件都会打开相应的 file handle(可以理解为 fd),因此,partition 越多,将会带来更多的 fd
- Producer 和 Consumer 都会按照 partition 去缓存数据,每个 partition 都会带来数十 KB 的消耗,partition 越多, Client 将会占用更多的内存
Quotas
- 避免恶意客户端并维护SLA - 安全认证
- 设定字节率阈值限制
- 监控throttle-rate,byte-rate
- replica.fecth.response.max.bytes: 设置follower副本fetch请求response大小
- 限制带宽: kafka-reassign-partitions.sh --throttle options
Kafka Producer
-
使用Java版本producer
-
使用kafka-producer-perf-test.sh测试
-
设置好内存、cpu、batch、压缩等参数
- batch.size: 越大,TPS越大,延时也越大
- linger.ms: 越大,TPS越大,延时也越大
- max.in.flight.requests.per.connection: 增加TPS,影响消息接收顺序
- compression.type: 设置压缩类型,提升TPS
- acks: 设置消息持久性级别
-
避免发送大消息(会使用更多内存,降低broker处理)
性能调优
-
如果吞吐量小于网络带宽
- 增加线程;
- 提高 batch.size;
- 增加更多 producer 实例;
- 增加 partition 数;
-
设置 acks=-1 时,如果延迟增大:可以增大 num.replica.fetchers(follower 同步数据的线程数)来调解
-
跨数据中心的传输:增加 socket 缓冲区设置以及 OS tcp 缓冲区设置
监控指标
- batch-size-avg
- compression-rate-avg
- waiting-threads
- buffer-available-bytes
- record-queue-time-max
- record-send-rate
- records-per-request-avg
Kafka Consumer
-
使用 kafka-consumer-perf-test.sh 测试环境;
-
吞吐量问题:
- partition 数不够
- OS缓存命中太低,分配更多页缓存
- 应用的处理逻辑
-
offset topic
- offsets.topic.replication.factor:默认为3;
- offsets.retention.minutes:默认为1440,即 1day;
- MonitorISR,topicsize;
-
offset commit慢的问题:异步 commit 或 手动 commit
基本配置
-
fetch.min.bytes 、fetch.max.wait.ms fetch最小bytes和最大等待时间
-
max.poll.interval.ms:调用 poll() 之后延迟的最大时间,超过这个时间没有调用 poll() 的话,就会认为这个 consumer 挂掉了,将会进行 rebalance
-
max.poll.records:当调用 poll() 之后返回最大的 record 数,默认为500;
-
session.timeout.ms;
-
Consumer Rebalance
- check timeouts
- check processing times/logic
- GC Issues
-
网络配置
监控
监控消费速度
-
Consumer Lag:consumer offset 与 the end of log(partition 可以消费的最大 offset) 的差值
-
监控
- metric 监控:records-lag-max;
- 通过 bin/kafka-consumer-groups.sh 查看
- 用于 consumer 监控的 LinkedIn’s Burrow
-
减少 Lag
- 分析 consumer:是 GC 问题还是 Consumer hang 住了
- 增加 Consumer 的线程
- 增加分区数和 consumer 线程
- 提高业务处理速度
如何保证数据不丢?
- producer
- retries = MAX
- acks=all
- max.in.flight.requests.per.connection = 1
- broker
- replication factor >= 3
- min.insync.replicas = 2
- 关闭unclean leader选举
- consumer
- auto.offset.commit = false
- 消息被处理后提交offset