消费流程
1. 消费者组协调
消费者启动前需要配置集群地址、所属消费组等参数,Listener需要指定要订阅的主题等;
- bootstrap.servers:Kafka集群地址。
- group.id:消费者所属的消费者组。
- key.deserializer/value.deserializer:消息键值的反序列化器。
- auto.offset.reset:加入新组时的消费策略:无偏移量时从最早(earliest)或最新(latest)开始消费。
消费者启动后,向Broker的协调者注册,请求加入消费者组:
- 协调器负责消费者组的分区配置;哪些消费者消费哪些分区;
- 协调器负责管理消费者成员,当有新的消费者或现有消费者离开,协调器都会感知并触发rebalance;
- 消费者会向协调者发送joinGroup请求,申请加入特定的消费者组(group.id),并触发rebalance;
- session.timeout.ms:协调者判断消费者存活的超时时间。
topic分区的分配:
- 协调器根据分配策略,给新加入的消费者分配分区;
- 分配策略:Range、RoundRobin、Sticky;
- 如果消费者数量大于分区数,则会存在空闲的消费者;
2. 开始消费
消费者会批量从分区拉取数据;
- max.poll.records:单次拉取的最大消息数。可以根据消费的能力,设置适当的大小;可以作为一个简单的限流参数;
- fetch.min.bytes
- fetch.max.wait.ms
消息以字节的形式拉取,需要执行反序列化;
消费完成后,管理offset:
- 自动提交:
- enable.auto.commit:开启自动提交offset(默认true)
- auto.commit.interval.ms:自动提交offset的间隔,定时提交偏移量到__consumer_offsets主题
- 手动提交:
- enable.auto.commit:设置为false;
- commitSync()确保提交成功,但阻塞后续操作
- commitAsync()不阻塞,但需处理回调异常
3. 消费者的心跳检测
心跳线程:
- 后台线程定期(heartbeat.interval.ms)向协调者发送心跳。
- 若协调者在session.timeout.ms内未收到心跳,触发再平衡。
4. 优雅停机
当消费者感知到下机时:
- 调用consumer.wakeup()或consumer.close(),提交未完成的偏移量。
- 触发再平衡,确保其他消费者接管分区。
消费方式和offset管理
启动时消费位点:
- none:从保存的offset继续消费;如果没有offset(新组),抛出异常;
- latest:从保存的offset继续消费;如果没有offset(新组),则只消费上线后的新数据;
- earliest:从保存的offset继续消费;无提交的offset时(新组),从头开始消费;
offset的存储:
- offset保存在 Kafka 一个内置的topic中:__consumer_offsets;
- offset由Group Coordinator进行管理;
- 使用key-value方式存储:
- key:group.id + topic + partition.id
- value: 消费的offset位置;
- key:
offset的提交:
- 自动提交;
# 是否自动提交 enable.auto.commit = true # 自动提交频率 auto.commit.interval.ms = 1000
- 手动提交;
- 同步提交:commitSync,可以进行失败重试;
- 异步提交:commitAsync,失败需要自行额外保证;
消息路由策略
- RangeAssignor:partitions/consumers 按照范围分配;
- 按照字母顺序,依次分配;
- 如果不是正好平均,就会数据倾斜;
- 触发再平衡,则重新计算;
- RoundRobin:依次轮流分配;
- 触发再平衡,则重新计算;
- Cooperative Sticy:粘性分配;
- 尽量均匀的分配;
- 消费者变动时,尽量保留现有的分配方式减少重新分配的Partition数量;
由参数:
org.apache.kafka.clients.consumer.RangeAssignor org.apache.kafka.clients.consumer.RoundRobinAssignor org.apache.kafka.clients.consumer.StickyAssignor org.apache.kafka.clients.consumer.CooperativeStickyAssignor
消费者保证不丢失消息
1、==使用手动提交==:
- 同步:commitSync,有自动的重试机制;可靠性高;
- 异步:commitAsync,不管是否提交成功;
保证处理完成,再commit,中间宕机,未commit的消息,还可以继续消费;
如何幂等消费
通常Kafka以最少一次的语意,那么就有可能重复消费:消费端处理完消息,提交前宕机,offset未更新,则可能重新消费此消息;
重复消费场景
消费者宕机、session超时等导致失联,并且开启了自动提交,已经消费消息,但是未提交offset;
消费者重启后,向
避免重复消费
Kafka端是无法解决重复消费问题的,只能业务自行处理消费幂等;
需要结合业务,业务代码中,根据业务的消息唯一Id,在处理消息的业务层面做幂等;重复消费时,进行过滤;
实现精准一次 = 消费者事务 + 幂等
处理积压问题
上线前做好压测
上线之前应该在测试环境,进行压测,对Kafka消息流量有一个预估;
- 确定消费端消费配置(并行度、线程数)所能达到的消费速率;
- 预估线上的大致流量、峰值流量、峰值持续时间等;
如果生产中数据积压
第一要务是优先恢复业务:消费端扩容、Kafka分区扩容,提升消费能力;(k8s)
事后排查优化
1、提高消费的并行度,增加Kafka分区,增加消费端节点;提高消费吞吐量;
2、消费方式使用批量消费;
- fetch.max.bytes:一次拉取消息的总字节数;默认50Mb
- max.poll.records:一次拉取消息的数量;默认500条;
3、优化消费端业务代码,分解业务逻辑,有些逻辑可能不一定非要在消费过程中处理;如果存在复杂的IO操作,考虑是否可异步化;
消费失败的重试策略
Kafka本身没有对消息重试的支持,不像RocketMQ有默认的有限次的重试机制;kafka需要业务自行处理重试:
- 消费失败后,不提交offset,下次再次消费;
- 消息重投到当前消费的topic;
- 设置重试Topic;
- 落库记录,定时任务重试;
实现顺序消费
1、首先Kafka的分区中的消息顺序是按照投递顺序排列的,可以根据这个特性,只使用一个Partition接收需要顺序消费的消息,然后使用单线程消费; 2、如果一定要多线程消费;也只能是多线程拉取,最终以一定的策略,还是需要单线程处理逻辑;
可以考虑乱序程度、乱序延迟;比如,最多消息乱序不超过5s,我们可以多线程落库,然后单线程批量拉取消息消费,此时应该每次拉取预留5s的时间,保证拉取到的消息都是顺序,且没有落后的消息;