Kafka完整消息生产到投递到集群的过程,以及如何保证在生产端保证消息不丢是、生产幂等、如何保证有序消息等等;
消息从生产到发送的过程
1. 生产者初始化
启动前需要对生产者进行连接、序列化等配置:
- 指定集群地址(bootstrap.servers)
- 指定key、value序列化方式(key.serializer和value.serializer)
启动后,生产者会先从Kafka服务端获取元数据:
- 获取集群元数据;
- 获取Topic分区信息;
- 确定每个Topic的Leader分区所在的broker,确保消息发送到正确的broker;
2. 消息投递
序列化:消息发送之前根据配置,对消息进行序列化;
分区的路由策略:
- 指定分区:消息则直接发送到指定的分区;
- 未指定分区:根据分区策略路由;
- 随机;
- 轮询;
- Hash:根据消息key的hash值确定分区;可以保证具有相同的key,总是发送到同一个分区;
- 自定义路由策略;spring-kafka中实现Partitioner接口;
同步发送:生产者发送消息后会等待 Broker 的响应,只有当收到 Broker 的确认消息后,才会继续发送下一条消息。这种方式可以保证消息的顺序性,但会降低发送性能;
异步发送(默认模式):生产者发送消息后不会等待 Broker 的响应,而是继续发送下一条消息。生产者可以通过回调函数来处理 Broker 的响应,这种方式可以提高发送性能,但可能会导致消息的顺序性无法保证。
- 批处理:kafka在异步发送模式下以批量的方式发送数据,可以大大提高性能,消息会暂存在本地缓冲区内,当消息数达到batch.size(默认16kb)或发送间隔达到liner.ms(默认10ms)就会触发一次批量发送;
- 数据压缩:compression.type指定压缩算法,通过耗时,提高吞吐量,降低带宽;
- broker端默认不会解压消息,会交给消费端解压;
- 如果broker采用了与发送端不同的压缩方式,则会触发解压;
- 如果broker中发生了消息转换,则会触发解压,同时零拷贝失效;
- 以上两种情况发生,则可能导致kafka服务CPU升高;因此,尽量保证全链路数据处理方式一致;
3. Broker接收到消息和应答策略
消息存储:Broker 接收到生产者发送的消息后,会将消息追加到对应的分区的日志文件中。
- 每个分区都有一个日志文件,消息会按照顺序追加到日志文件的末尾;
- 如果存在多个副本,还会触发消息复制到其他副本;
ACK应答机制:生产者发送消息到broker的确认机制;
- ack = 0;只管发,不管broker是否接收到消息,不等待数据落盘;基本不使用;(性能最好)
- ack = 1;消息发送到Leader就返回响应,不管ISR队列是否同步完成;
- ack =-1;消息发送到Leader,并且同步到ISR队列每个副本后,返回成功;(可靠性最高)
发送端如何保证数据不丢失
- 开启ack = -1;保证能落盘,并同步到ISR队列所有follower;最高可靠性保证;
- 适当设置重试retries,发送失败时重试;(开启生产者幂等,防重)
- 分区的副本数 >= 2
- ISR副本数min.insync.replicas>= 2
- Broker端的刷盘策略:可以配置 log.flush.interval.messages和log.flush.interval.ms等参数;
- 每收到 log.flush.interval.messages条消息,就落盘一次;
- 间隔log.flush.interval.ms时长,就落盘一次;
- 每收到
极端场景:
假设开启
ack = -1
,且 replication.factor = 3
、min.insync.replicas = 2
,这种配置已经能够保证99%的场景不丢数据了; 当Kafka的Partiton的主副本都同步完成,消息写入PageCache后,就会返回ack,表示接收消息成功; 但是此时数据并没有落盘,flush()落盘的操作是操作系统来完成的,一般是30s定时刷一次脏页; 此时所有节点宕机,就会丢数据;
发送端幂等/生产者事务
发送端的事务是由幂等实现的;
开启生产者事务:
enable.idempotence=true
,会自动开启消息的重试;
1、每一个生产者会分配一个PID(Producer Id),当发送消息时,消息会携带一个序列号
SeqNumber
,并且自增;
2、服务端每个Partition会为每个Producer,维护最大序列号,以:
PID + PartionId + SeqNumber
为依据,防止消息的重复;
发送端保证数据有序
Kafka的分区中的消息顺序是按照投递顺序排列的,因此还是需要发送端根据业务顺序投递,才能保证消费端==单个分区消费是顺序的==;
要保证发送端顺序,可以:
- 只使用一个分区;
- 对需要顺序消费的消息,指定分区分送;