消息队列:Kafka生产端投递过程

Kafka完整消息生产到投递到集群的过程,以及如何保证在生产端保证消息不丢是、生产幂等、如何保证有序消息等等;

消息从生产到发送的过程

1. 生产者初始化

启动前需要对生产者进行连接、序列化等配置:

  • 指定集群地址(
    bootstrap.servers
  • 指定key、value序列化方式(
    key.serializer
     和 
    value.serializer

启动后,生产者会先从Kafka服务端获取元数据:

  • 获取集群元数据;
  • 获取Topic分区信息;
  • 确定每个Topic的Leader分区所在的broker,确保消息发送到正确的broker;

2. 消息投递

序列化:消息发送之前根据配置,对消息进行序列化;

分区的路由策略:

  • 指定分区:消息则直接发送到指定的分区;
  • 未指定分区:根据分区策略路由;
    • 随机;
    • 轮询;
    • Hash:根据消息key的hash值确定分区;可以保证具有相同的key,总是发送到同一个分区;
  • 自定义路由策略;spring-kafka中实现
    Partitioner
    接口;

同步发送:生产者发送消息后会等待 Broker 的响应,只有当收到 Broker 的确认消息后,才会继续发送下一条消息。这种方式可以保证消息的顺序性,但会降低发送性能;

异步发送(默认模式):生产者发送消息后不会等待 Broker 的响应,而是继续发送下一条消息。生产者可以通过回调函数来处理 Broker 的响应,这种方式可以提高发送性能,但可能会导致消息的顺序性无法保证。

  • 批处理:kafka在异步发送模式下以批量的方式发送数据,可以大大提高性能,消息会暂存在本地缓冲区内,当消息数达到
    batch.size
    (默认16kb)或发送间隔达到
    liner.ms
    (默认10ms)就会触发一次批量发送;
  • 数据压缩
    compression.type
    指定压缩算法,通过耗时,提高吞吐量,降低带宽;
    • broker端默认不会解压消息,会交给消费端解压;
    • 如果broker采用了与发送端不同的压缩方式,则会触发解压;
    • 如果broker中发生了消息转换,则会触发解压,同时零拷贝失效;
    • 以上两种情况发生,则可能导致kafka服务CPU升高;因此,尽量保证全链路数据处理方式一致;

3. Broker接收到消息和应答策略

消息存储:Broker 接收到生产者发送的消息后,会将消息追加到对应的分区的日志文件中。

  • 每个分区都有一个日志文件,消息会按照顺序追加到日志文件的末尾;
  • 如果存在多个副本,还会触发消息复制到其他副本;

ACK应答机制:生产者发送消息到broker的确认机制;

  1. ack = 0;只管发,不管broker是否接收到消息,不等待数据落盘;基本不使用;(性能最好)
  2. ack = 1;消息发送到Leader就返回响应,不管ISR队列是否同步完成;
  3. ack =-1;消息发送到Leader,并且同步到ISR队列每个副本后,返回成功;(可靠性最高)

发送端如何保证数据不丢失

  1. 开启
    ack = -1
    ;保证能落盘,并同步到ISR队列所有follower;最高可靠性保证;
  2. 适当设置重试
    retries
    ,发送失败时重试;(开启生产者幂等,防重)
  3. 分区的副本数 >= 2
  4. ISR副本数
    min.insync.replicas
     >= 2
  5. Broker端的刷盘策略:可以配置 
    log.flush.interval.messages
     和 
    log.flush.interval.ms
     等参数;
    1. 每收到 
      log.flush.interval.messages
      条消息,就落盘一次;
    2. 间隔
      log.flush.interval.ms
      时长,就落盘一次;

极端场景:

假设开启 

ack = -1
,且 
replication.factor = 3
min.insync.replicas = 2
,这种配置已经能够保证99%的场景不丢数据了; 当Kafka的Partiton的主副本都同步完成,消息写入PageCache后,就会返回ack,表示接收消息成功; 但是此时数据并没有落盘,flush()落盘的操作是操作系统来完成的,一般是30s定时刷一次脏页; 此时所有节点宕机,就会丢数据;

发送端幂等/生产者事务

发送端的事务是由幂等实现的;

开启生产者事务:

enable.idempotence=true
,会自动开启消息的重试;

1、每一个生产者会分配一个PID(Producer Id),当发送消息时,消息会携带一个序列号

SeqNumber
,并且自增;

2、服务端每个Partition会为每个Producer,维护最大序列号,以:

PID + PartionId + SeqNumber
为依据,防止消息的重复;

发送端保证数据有序

Kafka的分区中的消息顺序是按照投递顺序排列的,因此还是需要发送端根据业务顺序投递,才能保证消费端==单个分区消费是顺序的==;

要保证发送端顺序,可以:

  • 只使用一个分区;
  • 对需要顺序消费的消息,指定分区分送;