消息中间件篇
消息中间件使用场景
- 异步发送(验证码,短信,邮件)
- MySQL和Redis,ES之间的数据同步
- 分布式事务(最终一致性)
- 作为发布/订阅系统实现一个微服务系统间的观察者模式。
- 连接流计算任务和数据。
- 用于将消息广播给大量的接收者,数据同步。
流量控制(削峰填谷)
错峰与流控。
问题:如何避免过多的请求压垮系统?
设计思路:使用消息队列隔离网关和后端服务,以达到流量控制和保护后端服务的目的。
代价:
- 增加系统调用的链路,导致总体的响应时间变成。
- 同步调用变成了异步调用,增加系统的复杂度。
- 成本问题,MQ的高性能和高可用。
常见的限流算法:
- 固定窗口算法
- 滑动窗口算法
- 漏桶算法
- 令牌桶算法:有一个程序,在单位时间内只发放固定的令牌到令牌桶中,规定服务在处理请求之前必须先从令牌桶中先获取一个令牌,如果令牌桶中没有令牌,则拒绝请求。这样就可以保证单位时间内,能处理的请求不超过发放令牌的数量。
服务解耦
微服务的通信模式:
- 调用链模式:A -> B -> C
- 聚合器模式:有点像DDD中和聚合。
- 基于事件的异步模式:有点像DDD中的事件。
分布式事务
产生的原因
- 存储层拆分:数据库分库分表。
- 服务层拆分:业务的服务化。
1. 两阶段提交协议
非常经典的强一致性、中心化的原子提交协议。
会假设满足以下条件:
- 存在一个节点作为协调者,其他节点作为参与者,且协调者和参与者可以正常的网络通信。
- 所有的节点都是采用预写式日志,日志被写入后被保存在可靠的存储设备上,即使节点损坏也不会导致日志数据的丢失。
- 所有节点都不会被永久性的损坏,即使损坏后仍然是可以恢复的。
两个阶段分别是:Commit-Request和Commit阶段。

2. TCC
3. 本地消费表
消息不丢失
RabbitMQ
消息发送的过程:

消息在任何一个阶段都有可能丢失。
生产者确认机制
RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ后,会返回一个ACK消息给发送者,表示是否成功。
- nack publisher-confirm:在到达exchange的过程中失败。
- ack publisher-return:在达到消息队列过程中失败。
- ack publisher-confirm:成功到达消息队列。
消息失败后处理
- 回调方法重发
- 记录日志
- 保存到数据库后定时重发,成功后删除表中记录。
消息持久化
MQ默认是内存存储消息,开启持久化功能可用确保缓存在MQ中的消息不丢失。
交换机持久化
@Bean
public DirectExchange simpleExchange() {
// 交换机名称,是否持久化,当没有queue与其绑定时是否自动删除
return new DirectExchange("simple.direct", true, true);
}
队列持久化
@Bean
public Queue simpleQueue() {
// 使用QueueBuilder构建队列,durable就是持久化的
return QueueBuilder.durable("simple.queue").build();
}
消息持久化
SpringAMQP中的消息默认就是持久化的,可用通过MessageProperties中的DeliveryMode来指定。
消费者确认机制
RabbitMQ支持消费者确认机制,当消费者处理消息后,向MQ发送一条ACK消息,MQ收到ACK消息后才会删除消息。SpringAMQP允许三种配置方式。
- manual:手动ACK,需要在业务代码结束后,调用API发送ACK。
- auto:自动ACK,由Spring监听Listentce代码是否出现异常,没有异常返回ACK;出现异常返回NACK。
- none:关闭ACK,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。
利用Spring的retry机制,在消费者处理异常时利用本地重试,当重试次数到达了设置的重试次数后,若消息依然失败,则将消息投递到异常交换机,由人工处理。
消息重复消费
RabbitMQ
消费者处理完成后,还没来得及给MQ发送确认消息,消费者就挂了,这样MQ会认为该消息还没被处理。当消费者重启后,又会去MQ中获取消息,导致消息的重复消费。
解决
- 每条消息设置一个业务唯一ID,当消费的时候去校验业务ID是否存在。
- 幂等:【分布式锁、数据库锁(悲观锁、乐观锁)】。
延迟队列
进入队列后的消息会被延迟消费。
场景:超时订单等。
延迟队列=死信交换机+TTL(生存时间)
死信交换机
当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):
- 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false。
- 消息是一个过期消息,超时无人消费。
- 要投递的队列消息堆积满了,最早的消息可能成为死信。
如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机被称为死信交换机(Dead Letter Exchange,简称DLX)。
@Bean
public Queue ttlQueue() {
return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
.ttl(10000) // 设置队列的超时时间,10秒
.deadLetterExchange("dl.direct")// 指定死信交互机
build();
}
TTL
TTL,也就是Time To Live。如果一个队列中的消息TTL结束后仍未消费,则会成为死信,TTL超时分为两种情况:
- 消息所在的队列设置了存货时间。
- 消息本身设置了存货时间。
注:具体的存活时间=min(1, 2)。
插件DelayExchange
- 声明一个交换机,设置
delayed属性为true。 - 发送消息时,设置请求头
x-delay,值为超时时间。
消息堆积
当生产者发送消息的速度超过消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储的消息达到上限后。之后发送的消息就会成为死信,可能会被丢弃。
RabbitMQ
解决思路:
- 添加更多的消费者,提高消费速度。
- 在消费者内开启线程池加快消息处理速度。
- 扩大队列容量,提高堆积上限 -> 惰性队列。
惰性队列
特点:
- 接收消息后直接存储到磁盘,而不是内存。
- 消费者要消费消息的时候,才会从磁盘读取消息到内存。
- 支持数百万条的消息存储
@Bean
public Queue lazyQueue() {
return QueueBuilder
.durable("lazy.queue")
.lazy() // 开启 x-queue-mode为lazy
.build();
}
或者
@RabbitListener(queuesToDeclare = @Queue(
name = "lazy.queue",
durable = "true",
arguments = @Argument(name = "x-queue-mode", value="lazy")
))
public void listenLazyQueue(String msg) {
}
高可用机制
RabbitMQ
使用集群来保证高可用
普通集群
- 会在集群中各个节点共享交换机,队列元信息(其它节点队列的引用)。不包含队列中的信息。
- 当访问集群节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回。
- 队列所在节点宕机,队列中的消息就会消失。
镜像集群
本质是主从模式,特点如下:
- 交换机,队列,消息会在各个MQ的镜像节点之间同步备份。
- 创建队列的节点被称为该队列的主节点,备份到的其他节点叫做该队列的镜像节点。
- 一个队列的主节点可能是另外一个队列的镜像节点。
- 所有操作都是主节点完成,然后同步给镜像节点。(如果在主从完成前,主节点已经宕机,可能出现数据丢失)
- 主节点宕机后,镜像节点会替代成新的主节点。
仲裁队列
仲裁队列是3.8以后才有的新功能,用来代替镜像队列,特征如下:
- 与镜像队列一样,都是主从模式,支持主从数据同步
- 主从同步基于Raft协议,保证了强一致性。
@Bean
public Queue quorumQueue() {
return QueueBuilder
.durable("quorum.queue")
.quorum() // 仲裁队列
.build();
}
高性能设计
数据存储和清理
消息队列选型的基本标准
- 功能需求
- 非功能需求
- 必须开源。出了问题,还有机会自己修改源码来修复该问题。
- 较流行,且有一定的社区活跃的。大家都在用,遇到的问题概率较小,在生态方面也会比较好。
作为一款合格的消息队列产品,必须具备的几个特性:
- 可靠性:消息可靠传递,可靠存储、不丢消息。
- 高可用:支持集群,确保不会因为某个节点宕机导致服务不可用,也不能丢消息。
- 高性能:具备足够好的性能,能满足绝大多数场景的性能需求。
消费队列的消费模型
- 队列模型:FIFO。
- 发布-订阅模型。
RabbitMQ
RabbitMQ用的是队列模型。

如果希望同一份数据被多个消费者消费,可以配置Exchange将消息投递到多个队列中,这样每个队列都存储了完整的消息数据。
变相地实现了发布-订阅模型。

RocketMQ
RocketMQ是标准的“发布-订阅”模型。
一个Broker中会存在多个Topic。

几乎所有的MQ都是“请求 - 确认“机制。

当Broker中的Topic下的Queue收到生产者的消息后,会向生产者发送确认的响应。如果生产者没有收到MQ的确认,此时会重发消息。
对于消费者来说,消费者收到消息并且完成自己的业务逻辑后,会给MQ发送消费成功的确认,MQ收到确认后,认为消息消费成功了,否则它会给消费者重新发送该消息,直到收到消费成功的确认。