https://www.geeksforgeeks.org/message-queues-system-design/
消息队列
A message queue is a form of Asynchronous communication and data transfer mechanism. For messages sent between various systems inside a broader software architecture, it serves as a temporary storage and routing system.
消息队列是一种异步通信和数据传输机制 在广泛的软件架构中的系统间发送消息,充当临时存储和路由系统
- 生产者Producer: any program that produces data for sharing and send to the message queue can be considered producers.
- 消息队列MessageQueue: the messages stored and managed by a data structure util the consumers consume them. It serves as buffer between producers and consumers.
- 消费者Consumer: consume and processed the messages from the message queue.
- 消息Message: a message can be defined as a unit of data that is transferred between producers and consumers.
消息队列要解决的问题
- 异步化:核心业务不需要同步等待非核心业务的情况下,可以异步化非核心业务,提高核心业务的响应速度;
- 功能解耦:系统间非强依赖的功能模块,通过消息队列进行解耦,降低系统复杂度;
- 流量削峰:系统承载不了过多的流量,或者不需要过多的流量,通过消息队列进行缓冲,保证系统可用的情况下,处理业务;
- 水平扩展业务:数据重复使用:一份数据往往需要多处使用,分发消费的过程可以交给消息队列;
消息队列会带来什么问题
- 重复消费问题;
- 消息丟失问题;
- 消息积压问题;
- 顺序消费场景的消费乱序问题;
消息队列的特性
In a distributed system, message queue is one of the most important component.
1. 异步通讯和并发处理
- Asynchronous Communication: In scenarios where the core workflow does not need to wait synchronously for non-core operations, the non-core operations can be made asynchronous which can improves the response speed of the core workflow.
生产者发送消息后无需等待消费者处理,提高响应速度,一些非核心、非及时响应的业务中,可以提升系统的响应速度,有更好的用户体验。
因为异步的特性,对于需要并发处理的多任务,可以通过消息队列完成,如果是在单节点内并发处理有较高的复杂度、资源的占用等问题,借助MQ的高吞吐、高可用,能够让这类问题更简化。
2. 系统解耦和扩展性
Message queues decouple applications from each other, allowing them to be developed independently. This makes systems more flexible, scalable and easier to maintain.
MQ可以解耦系统,允许它们独立开发、独立扩展,这样做让系统更弹性、更具扩展性、更好维护。 并且在业务增长时,通过增加消费者的数量、MQ的队列数量,提升消息的处理能力,天然横向扩展。
3. 流量削峰和负载均衡
When a system is unable to handle massive requests or when it is not necessary to process all incoming traffic immediately, message queue can act as a buffer.
当一个系统不能立即处理大量的请求或不需要立即处理所有到来的流量,此时MQ可以被当作一个缓冲区。
For example, if the system can handle 2000 requests per second, but during the peek time of 5000 requests per second, the system can leave 3000 requests to be stored in the MQ, preventing the system from breaking down during peek time. The requests in the MQ can be consumed by the server at a constant rate.
一个系统能够每秒处理2000请求,但是在流量高峰每秒有5000个请求,系统就可以将3000请求存储在MQ中,以防止过大的流量超出系统负载,避免宕机,在MQ中的请求可以被服务器以恒定的速率消费。 并且MQ的客户端可以将消息均匀路由到同一个Topic中的不同的消息队列,再通过订阅的模式,不同的消费者从不同的队列中获取消息,以此实现负载均衡;
4. 数据冗余和集成
When a piece of data needs to be consumed by multiple services, the message queue can distribute the message to different services. At the same time, that ensures data consistency across different parts of the system, because all modules receive the same data from the message queue.
当一份数据需要被多个服务消费时,消息队列可以将消息分发到不同的服务,同时也确保了数据一致性,当不同的系统模块处于不同的消费者组时,可以从MQ中消费相同的数据。 数据集成:大型系统的日志、指标等数据通常需要聚合、集成的方式进行处理,此时消息队列更像是一个可靠的数据源。
5. 提升系统可靠性
- Message-Persistence Message queue middleware typically offer message-persistence features. When a message is sent to the MQ, it is stored in a reliable storage. This ensures that even if the MQ server restarts, the messages are not lost.
消息中间件通常提供消息持久化机制,当一个消息被发送到队列中,它就会被存储到一个可靠存储中,这保证了即使消息队列服务重启,消息仍然不会丢失,并且在消费失败时,可以选择合适的重试机制,有更高的可用性。
- Delivery Guarantees Message queue middleware provide different levels of delivery guarantees.
MQ中间件通常提供不同等级的投递保证,如:At Most Once、At Least Once、Exactly Only Once;
- Redundancy and Fault-Tolerance Multiple MQ servers can be set up in a cluster, and messages can be replicated across these servers. They can detect failures of producers, consumers. When a consumer fails, the MQ can stop sending messages to that particular consumer and route them to other available consumers.
大部分MQ服务可以以集群方式启动,消息可以备份在不同的Server节点中,以此保证消息队列本身的可靠性。 它们可以检测生产者、消费者的故障。当消费者发生故障时,MQ 可以停止向该特定消费者发送消息并将其路由到其他可用的消费者。
7. 最终一致性的保证
大部分的消息队列都提供最终一致性的保证,因为消息队列的持久化、副本等机制首先保证了消息的可靠性,在此基础之上,对可能发生的宕机、超时、网络波动,消息队列能够通过「重试机制」,配合业务上的幂等消费保证最终一致性。
消息队列的典型架构
以最常用的发布订阅模式的架构为例:
- Broker:消息队列的集群节点。通常分布式消息队列系统都是集群化的,以此来保证更高的可靠性、扩展性。每一个节点都可以成为Broker。
- Topic:一类业务消息的标识。在业务中可以将同一类、同一业务的消息归为一个Topic,通常同一Topic下的消息有着相同的数据结构、相当于生产端和消费段的消息约定。
- Partition:消息分区。Partition是归属于Topic的,一个Topic的消息通常存储在不同的Partition中,以此来实现更高的并行消费能力。
- 生产端投递消息到不同的Partition分区。通常会以负载均衡的方式投递,就可以做到消费时也是均衡的。
- 同一个消费者组下的消费者消费不同的Parition分区,实现并行消费并且不会重复消费。由此可得到Parition分区数量的最佳值是:消费者数量;
- Partition可以看作是一个队列,但Partition的消息再被消费后,并不会删除,它会记录每个消费者组的消费位点。以此来实现不同的消费者组之间可以重复消费消息。一份消息对多个系统消费,天然的高扩展性。
- 如果某个Topic的业务增长,可以通过扩展Partition来提高并行能力。
- Partition是消息的存储单元,因此有副本机制,来保证可靠性,通常副本会存储在不同的Broker中,来提高可靠性。
- 客户端:
- 生产者:消息的生产端;为了负载均衡和并行能力,生产者通常采用Hash算法、轮询等方式将同一个Topic的消息发送到不同的分区;
- 消费者:消息的消费端;消息队列需要支持消费者的动态扩缩容,就需要在消费者数量、状态发生变化时,能够重新分配分区,也叫做Rebalance。消费消息通常采用Pull的方式,而非Poll,这样消费者可以根据自身的消费情况控制消费速率,通过Offset位点来避免重复消费。
- 协调服务(Coordinator)
- Partition有副本机制,那故障转移的能力也是不可少的,就会存在Leader(负责读写)、Follower(仅负责备份);
- 协调服务通常会管理集群的元数据、监控集群状态、故障转移;
- 常见的协调服务有:Zookeeper、Etcd等;
消息投递语意
1. 三种语意
常见的消息队列一般是以下三种语意:
- At Most Once:
- 生产端:最多一次。生产者只发一次,无论成功失败;(最多有一个消息投递成功)
- 消费端:先提交offset,再消费,最多消费一次,可能消费失败;
- At Least Once:
- 生产端:生产者未收到成功响应,进行重试;(最少有一个消息投递成功)
- 消费端:先消费,再提交offset,可能重复消费;
- Exactly Only Once:
- 生产端:既不重复、也不丢失;
- 消费端:手动提交 + 消费端借助其他组件的事务支持,实现事务;
2. 分布式环境下的取舍
在分布式系统中,消息的重复是可以容忍的,甚至某些情况下是需要的(消费者消费过程中宕机,未Commit),但是消息的丟失往往是不能容忍的。因此"At Most Once"基本不能满足要求。
因此只需要讨论下"At Least Once"和"Exactly Only Once":
(1)实现复杂度考虑:
- At Least Once:只需要通过重试机制保证消息不丟失;
- 生产者发送消息,如果broker未确认、超时,则重试;
- Broker持久化消息后,就保证了最少一次;消费失败也可重新投递;
- Exactly Only Once:需要全链路(生产、传输、消费)严格协调,避免重复:
- 生产者消息去重(消息唯一id + 事务日志)
- Broker全局去重(需要记录所有消息的状态)
- 消费者幂等处理(通过全局唯一标识)
(2)性能考虑:
- At Least Once:仅依赖本地重试、确认机制,对吞吐量影响很小,能够有更高的并发能力;
- Exactly Only Once:全局锁、事务日志、协调协议等等都会大幅降低吞吐量;(Kafka开启事务后吞吐量下降20%~30%)
(3)分布式一致性考虑:
- At Least Once:通过重试机制,可以保证最终一致性,消费的幂等只需要业务消费者自行控制即可;
- Exactly Only Once:正好一次与最终一致性冲突,分布式系统中往往存在超时的问题,通过重试机制+幂等有更高的可用性;
Rebalance
分布式消息队列的
消息队列的实现和对比
- kafka
- Pulsar
- RocketMQ(MetaQ)
- RabbitMQ
- Redis