消息队列
一、为什么会出现消息队列?
削峰、解耦
为啥不使用线程?线程是当前应用内的,限制太多
二、使用消息队列会出现哪些问题
1. 系统可用性降低
系统引入的外部依赖越多,越容易挂掉
2. 系统复杂度提高
消息的重复消费。哪些场景会产生重复的消息?
- 网络闪断
- ack失败,队列以为没有消费
解决方案:
-
业务端保持幂等,例如已取消的订单,又来了一个已取消的消息,则不处理,保持幂等
-
唯一的标识,类似uuid,对已经处理过的消息,不在做二次处理
-
消息表,使用msgId坐主键或者唯一约束
2. 消息丢失
消息会在哪些环节可能丢失?
- 消息在传入过程中丢失
- 队列收到消息,暂存在内存中,还没来得及被消费,队列挂掉了,内存中的数据丢失
- 消费者消费到了这个消息,但是还没有来得及处理,消费者就挂了,而队列以为消息已经被处理
解决方案:
先入事务表(消息表)。 如果事务表也满了呢,根据业务日志找回,因为在实际开发过程中都会打印日志,找到对应的日志,重新写入mq
2.1 针对RabbitMQ消息队列可以有以下措施解决:
- 对于生产者:
- 开启RabbitMQ事务(同步,不推荐)
- 开启confirm模式(异步推荐)
- 对于队列: 开启RabbitMQ持久化
- 对于消费者: 关闭RabbitMQ自动ack,采用手动ack
- 数据一致性问题。使用消息表兜底,如果消息表兜底措施也有问题,人工介入,系统并不能解决所有的问题
三、消息队列是天然的广播(发布)
消费者可各取所需(订阅)
四、天然的涉及到分布式
队列的解耦、异步,就会催生天然的分布式
五、Kafka 和 RabbitMQ 消息队列比对
-
消息传递模型:
RabbitMQ:RabbitMQ是一个消息代理,它实现了高级消息队列协议(AMQP)。它支持多种消息传递模型,包括点对点和发布-订阅模型。
Kafka:Kafka是一个分布式流处理平台,主要用于处理实时数据流。它采用发布-订阅模型,消息被持久化保存在日志中,允许多个消费者以不同的速率消费消息。
-
持久性:
RabbitMQ:RabbitMQ默认将消息保存在内存中,可以配置为将消息持久化到磁盘。这使得RabbitMQ在一些情况下可能会有较低的持久性。
Kafka:Kafka将消息持久化到磁盘,因此能够保证数据的持久性。它适用于需要高吞吐量和持久性的场景,如日志处理和事件溯源。
-
适用场景:
RabbitMQ:适用于传统的消息队列场景,如任务队列、事件驱动等。它提供了更多的消息处理模式,适合需要灵活性的应用。
Kafka:适用于大规模的数据管道和实时数据处理,特别是在日志聚合、事件溯源、和流处理方面表现出色。
-
性能:
RabbitMQ:RabbitMQ的性能也很好,但在某些情况下可能会受到单一队列的限制,因此在需要水平扩展性的场景下可能需要一些额外的配置。
Kafka:Kafka旨在提供高吞吐量和水平扩展性,适用于大规模的数据处理和分布式系统。
-
一致性和可用性:
RabbitMQ:RabbitMQ也提供了高可用性的配置选项,但可能需要一些复杂的设置来实现。
Kafka:Kafka设计为具有高可用性和容错性,可以容忍节点故障。它保证消息的有序性和一致性。
六、消息的顺序消费
保证消息顺序消费的前提是生产者的消息是顺序生产的
顺序消费由消费者业务保证(Hash等操作都可以完成),例如: 对于订单的场景,可以让同一个订单消息根据某个规则(例如hash)都发到一个队列中。
如果是多消费者呢?又如何保证消息消费的顺序性? from chatGPT
- 单一消费者: 如果你的应用程序结构允许,考虑使用单一消费者来处理消息。这样可以确保消息的顺序性。然而,这可能会影响系统的吞吐量。 —— 在我的工作经历中,都是采用这种方式,虽然部署了多个job节点,但是实际触发运行的只要一个节点,即单线程消费。如果是并行任务,应该单独起一个job项目专门处理这一类的任务,通过多消费者来提升处理能力
- 消息预处理: 在消费者处理消息之前,可以在多线程环境中进行预处理,将消息按照一定的规则排序,然后再交给各个线程处理。这样可以保持消息的顺序性。
- 多个队列,按顺序分发: 将消息分发到多个队列,每个队列由一个单独的消费者线程处理。确保消息根据某个规则(例如,消息的顺序号)发送到相应的队列。这样每个队列内的消息是有序的,但不同队列的消息可以并行处理。
在网上我看到一个观点,即业务系统真正需要的是业务事件顺序。
要按业务事件顺序处理消息,您实际上并不需要全局锁或只启动单个consumer,或者只选举一个consumer leader,这些解决方案提供了线性化级别的一致性(Linearizability Consistency),但是在大多数情况下它们太慢且不必要。 实际上,我们只关心因果级别的一致性(Causal Consistency), 只要能找出业务相关的消息,就可以按照事件顺序处理。 为此,我们可以在消费者端为那些因果相关的消息提供状态机,如果消息出现故障(过早),我们可以将其本地存储在“收件箱”中。 稍后,当延迟的消息到达时,我们将状态机触发到下一个状态,然后检查收件箱以查看是否存在下一条期望的消息,或者继续等待下一个消息。
自己的理解: “收件箱”其实就是类似一种共享变量,所有的消费者在消费时都要来访问一下“收件箱”再做逻辑
七、消息产生积压如何处理?
- 临时紧急扩容(如果是docker就很简单,再起几个容器) ——增加消费者
- 建多队列,写分发程序把queue中积压的数据分发到新建的队列中,然后在写多个指定的消费者,消费指定的新建的队列数据。处理完成后再恢复到之前的常规配置
- 消费者开启线程池加快消息处理速度
- 如果是RabbitMQ,可以扩大队列容积,提高堆积上限,采用惰性队列(在声明队列时设置属性
x-queue-mode
为lazy
,即惰性队列)惰性队列的特点:接收到消息后直接存储到磁盘而非内存;消费者要消费消息时才从磁盘读取到内存;支持百万条消息的存储
网站当前构建日期: 2025.01.19