消息队列:Kafka消费端消费过程

Kafka消费流程和offset管理、幂等消费和消息积压的处理

消费流程

1. 消费者组协调

消费者启动前需要配置集群地址、所属消费组等参数,Listener需要指定要订阅的主题等;

  • bootstrap.servers
    :Kafka集群地址。
  • group.id
    :消费者所属的消费者组。
  • key.deserializer
    /
    value.deserializer
    :消息键值的反序列化器。
  • auto.offset.reset
    :加入新组时的消费策略:无偏移量时从最早(
    earliest
    )或最新(
    latest
    )开始消费。

消费者启动后,向Broker的协调者注册,请求加入消费者组:

  • 协调器负责消费者组的分区配置;哪些消费者消费哪些分区;
  • 协调器负责管理消费者成员,当有新的消费者或现有消费者离开,协调器都会感知并触发
    rebalance
  • 消费者会向协调者发送
    joinGroup
    请求,申请加入特定的消费者组(
    group.id
    ),并触发
    rebalance
  • session.timeout.ms
    :协调者判断消费者存活的超时时间。

topic分区的分配:

  • 协调器根据分配策略,给新加入的消费者分配分区;
  • 分配策略:Range、RoundRobin、Sticky;
  • 如果消费者数量大于分区数,则会存在空闲的消费者;

2. 开始消费

消费者会批量从分区拉取数据;

  • max.poll.records
    :单次拉取的最大消息数。可以根据消费的能力,设置适当的大小;可以作为一个简单的限流参数;
  • fetch.min.bytes
  • fetch.max.wait.ms

消息以字节的形式拉取,需要执行反序列化;

消费完成后,管理offset:

  • 自动提交:
    • enable.auto.commit
      :开启自动提交offset(默认
      true
    • auto.commit.interval.ms
      :自动提交offset的间隔,定时提交偏移量到
      __consumer_offsets
      主题
  • 手动提交:
    • enable.auto.commit
      :设置为false;
    • commitSync()
      确保提交成功,但阻塞后续操作
    • commitAsync()
      不阻塞,但需处理回调异常

3. 消费者的心跳检测

心跳线程:

  • 后台线程定期(
    heartbeat.interval.ms
    )向协调者发送心跳。
  • 若协调者在
    session.timeout.ms
    内未收到心跳,触发再平衡。

4. 优雅停机

当消费者感知到下机时:

  • 调用
    consumer.wakeup()
    consumer.close()
    ,提交未完成的偏移量。
  • 触发再平衡,确保其他消费者接管分区。

消费方式和offset管理

启动时消费位点:

  • none:从保存的offset继续消费;如果没有offset(新组),抛出异常;
  • latest:从保存的offset继续消费;如果没有offset(新组),则只消费上线后的新数据;
  • earliest:从保存的offset继续消费;无提交的offset时(新组),从头开始消费;

offset的存储:

  • offset保存在 Kafka 一个内置的topic中:
    __consumer_offsets
  • offset由
    Group Coordinator
    进行管理;
  • 使用key-value方式存储:
    • key:
      group.id + topic + partition.id
    • value: 消费的offset位置;

offset的提交:

  • 自动提交;
# 是否自动提交
enable.auto.commit = true
# 自动提交频率
auto.commit.interval.ms = 1000
  • 手动提交;
    • 同步提交:commitSync,可以进行失败重试;
    • 异步提交:commitAsync,失败需要自行额外保证;

消息路由策略

  • RangeAssignor:partitions/consumers 按照范围分配;
    • 按照字母顺序,依次分配;
    • 如果不是正好平均,就会数据倾斜;
    • 触发再平衡,则重新计算;
  • RoundRobin:依次轮流分配;
    • 触发再平衡,则重新计算;
  • Cooperative Sticy:粘性分配;
    • 尽量均匀的分配;
    • 消费者变动时,尽量保留现有的分配方式减少重新分配的Partition数量;

由参数:

partition.assignment.strategy
决定,value配置对应的消费分配策略实现:

org.apache.kafka.clients.consumer.RangeAssignor org.apache.kafka.clients.consumer.RoundRobinAssignor org.apache.kafka.clients.consumer.StickyAssignor org.apache.kafka.clients.consumer.CooperativeStickyAssignor

消费者保证不丢失消息

1、==使用手动提交==:

enable.auto.commit = false

  • 同步:commitSync,有自动的重试机制;可靠性高;
  • 异步:commitAsync,不管是否提交成功;

保证处理完成,再commit,中间宕机,未commit的消息,还可以继续消费;

如何幂等消费

通常Kafka以最少一次的语意,那么就有可能重复消费:消费端处理完消息,提交前宕机,offset未更新,则可能重新消费此消息;

重复消费场景

消费者宕机、session超时等导致失联,并且开启了自动提交,已经消费消息,但是未提交offset;

消费者重启后,向

GroupCoordinator
查询消费offset,此时就会查询到已经消费的offset,造成重复消费;

避免重复消费

Kafka端是无法解决重复消费问题的,只能业务自行处理消费幂等;

需要结合业务,业务代码中,根据业务的消息唯一Id,在处理消息的业务层面做幂等;重复消费时,进行过滤;

实现精准一次 = 消费者事务 + 幂等

处理积压问题

上线前做好压测

上线之前应该在测试环境,进行压测,对Kafka消息流量有一个预估;

  • 确定消费端消费配置(并行度、线程数)所能达到的消费速率;
  • 预估线上的大致流量、峰值流量、峰值持续时间等;

如果生产中数据积压

第一要务是优先恢复业务:消费端扩容、Kafka分区扩容,提升消费能力;(k8s)

事后排查优化

1、提高消费的并行度,增加Kafka分区,增加消费端节点;提高消费吞吐量;

2、消费方式使用批量消费

  • fetch.max.bytes
    :一次拉取消息的总字节数;默认50Mb
  • max.poll.records
    :一次拉取消息的数量;默认500条;

3、优化消费端业务代码,分解业务逻辑,有些逻辑可能不一定非要在消费过程中处理;如果存在复杂的IO操作,考虑是否可异步化;

消费失败的重试策略

Kafka本身没有对消息重试的支持,不像RocketMQ有默认的有限次的重试机制;kafka需要业务自行处理重试:

  1. 消费失败后,不提交offset,下次再次消费;
  2. 消息重投到当前消费的topic;
  3. 设置重试Topic;
  4. 落库记录,定时任务重试;

实现顺序消费

1、首先Kafka的分区中的消息顺序是按照投递顺序排列的,可以根据这个特性,只使用一个Partition接收需要顺序消费的消息,然后使用单线程消费; 2、如果一定要多线程消费;也只能是多线程拉取,最终以一定的策略,还是需要单线程处理逻辑;

可以考虑乱序程度、乱序延迟;比如,最多消息乱序不超过5s,我们可以多线程落库,然后单线程批量拉取消息消费,此时应该每次拉取预留5s的时间,保证拉取到的消息都是顺序,且没有落后的消息;