消息中间件使用场景

  • 异步发送(验证码,短信,邮件)
  • MySQL和Redis,ES之间的数据同步
  • 分布式事务(最终一致性)
  • 作为发布/订阅系统实现一个微服务系统间的观察者模式。
  • 连接流计算任务和数据。
  • 用于将消息广播给大量的接收者,数据同步。

流量控制(削峰填谷)

错峰与流控。

问题:如何避免过多的请求压垮系统?

设计思路:使用消息队列隔离网关和后端服务,以达到流量控制和保护后端服务的目的。

代价:

  • 增加系统调用的链路,导致总体的响应时间变成。
  • 同步调用变成了异步调用,增加系统的复杂度。
  • 成本问题,MQ的高性能和高可用。

常见的限流算法:

  • 固定窗口算法
  • 滑动窗口算法
  • 漏桶算法
  • 令牌桶算法:有一个程序,在单位时间内只发放固定的令牌到令牌桶中,规定服务在处理请求之前必须先从令牌桶中先获取一个令牌,如果令牌桶中没有令牌,则拒绝请求。这样就可以保证单位时间内,能处理的请求不超过发放令牌的数量。

服务解耦

微服务的通信模式:

  • 调用链模式:A -> B -> C
  • 聚合器模式:有点像DDD中和聚合。
  • 基于事件的异步模式:有点像DDD中的事件。

分布式事务

产生的原因

  1. 存储层拆分:数据库分库分表。
  2. 服务层拆分:业务的服务化。

1. 两阶段提交协议

非常经典的强一致性、中心化的原子提交协议。

会假设满足以下条件:

  • 存在一个节点作为协调者,其他节点作为参与者,且协调者和参与者可以正常的网络通信。
  • 所有的节点都是采用预写式日志,日志被写入后被保存在可靠的存储设备上,即使节点损坏也不会导致日志数据的丢失。
  • 所有节点都不会被永久性的损坏,即使损坏后仍然是可以恢复的。

两个阶段分别是:Commit-Request和Commit阶段。

image-20240321223615537

2. TCC

3. 本地消费表

消息不丢失

RabbitMQ

消息发送的过程:

image-20230705012322187

消息在任何一个阶段都有可能丢失。

生产者确认机制

RabbitMQ提供了publisher confirm机制来避免消息发送到MQ过程中丢失。消息发送到MQ后,会返回一个ACK消息给发送者,表示是否成功。

  • nack publisher-confirm:在到达exchange的过程中失败。
  • ack publisher-return:在达到消息队列过程中失败。
  • ack publisher-confirm:成功到达消息队列。

消息失败后处理

  • 回调方法重发
  • 记录日志
  • 保存到数据库后定时重发,成功后删除表中记录。

消息持久化

MQ默认是内存存储消息,开启持久化功能可用确保缓存在MQ中的消息不丢失。

交换机持久化

@Bean
public DirectExchange simpleExchange() {
    // 交换机名称,是否持久化,当没有queue与其绑定时是否自动删除
    return new DirectExchange("simple.direct", true, true);
}

队列持久化

@Bean
public Queue simpleQueue() {
    // 使用QueueBuilder构建队列,durable就是持久化的
    return QueueBuilder.durable("simple.queue").build();
}

消息持久化

SpringAMQP中的消息默认就是持久化的,可用通过MessageProperties中的DeliveryMode来指定。

消费者确认机制

RabbitMQ支持消费者确认机制,当消费者处理消息后,向MQ发送一条ACK消息,MQ收到ACK消息后才会删除消息。SpringAMQP允许三种配置方式。

  1. manual:手动ACK,需要在业务代码结束后,调用API发送ACK。
  2. auto:自动ACK,由Spring监听Listentce代码是否出现异常,没有异常返回ACK;出现异常返回NACK。
  3. none:关闭ACK,MQ假定消费者获取消息后会成功处理,因此消息投递后立即被删除。

利用Spring的retry机制,在消费者处理异常时利用本地重试,当重试次数到达了设置的重试次数后,若消息依然失败,则将消息投递到异常交换机,由人工处理。

消息重复消费

RabbitMQ

消费者处理完成后,还没来得及给MQ发送确认消息,消费者就挂了,这样MQ会认为该消息还没被处理。当消费者重启后,又会去MQ中获取消息,导致消息的重复消费。

解决

  1. 每条消息设置一个业务唯一ID,当消费的时候去校验业务ID是否存在。
  2. 幂等:【分布式锁、数据库锁(悲观锁、乐观锁)】。

延迟队列

进入队列后的消息会被延迟消费。

场景:超时订单等。

延迟队列=死信交换机+TTL(生存时间)

死信交换机

当一个队列中的消息满足下列情况之一时,可以成为死信(dead letter):

  • 消费者使用basic.reject或basic.nack声明消费失败,并且消息的requeue参数设置为false。
  • 消息是一个过期消息,超时无人消费。
  • 要投递的队列消息堆积满了,最早的消息可能成为死信。

如果该队列配置了dead-letter-exchange属性,指定了一个交换机,那么队列中的死信就会投递到这个交换机中,而这个交换机被称为死信交换机(Dead Letter Exchange,简称DLX)。

@Bean
public Queue ttlQueue() {
    return QueueBuilder.durable("simple.queue") // 指定队列名称,并持久化
        .ttl(10000) // 设置队列的超时时间,10秒
        .deadLetterExchange("dl.direct")// 指定死信交互机
        build();
}

TTL

TTL,也就是Time To Live。如果一个队列中的消息TTL结束后仍未消费,则会成为死信,TTL超时分为两种情况:

  1. 消息所在的队列设置了存货时间。
  2. 消息本身设置了存货时间。

注:具体的存活时间=min(1, 2)。

插件DelayExchange

  • 声明一个交换机,设置delayed属性为true
  • 发送消息时,设置请求头x-delay,值为超时时间。

消息堆积

当生产者发送消息的速度超过消费者处理消息的速度,就会导致队列中的消息堆积,直到队列存储的消息达到上限后。之后发送的消息就会成为死信,可能会被丢弃。

RabbitMQ

解决思路:

  1. 添加更多的消费者,提高消费速度。
  2. 在消费者内开启线程池加快消息处理速度。
  3. 扩大队列容量,提高堆积上限 -> 惰性队列。

惰性队列

特点:

  • 接收消息后直接存储到磁盘,而不是内存。
  • 消费者要消费消息的时候,才会从磁盘读取消息到内存。
  • 支持数百万条的消息存储
@Bean
public Queue lazyQueue() {
    return QueueBuilder
        .durable("lazy.queue")
        .lazy() // 开启 x-queue-mode为lazy
        .build();
}

或者

@RabbitListener(queuesToDeclare = @Queue(
	name = "lazy.queue",
    durable = "true",
    arguments = @Argument(name = "x-queue-mode", value="lazy")
))
public void listenLazyQueue(String msg) {
    
}

高可用机制

RabbitMQ

使用集群来保证高可用

普通集群

  • 会在集群中各个节点共享交换机,队列元信息(其它节点队列的引用)。不包含队列中的信息。
  • 当访问集群节点时,如果队列不在该节点,会从数据所在节点传递到当前节点并返回。
  • 队列所在节点宕机,队列中的消息就会消失。

镜像集群

本质是主从模式,特点如下:

  • 交换机,队列,消息会在各个MQ的镜像节点之间同步备份。
  • 创建队列的节点被称为该队列的主节点,备份到的其他节点叫做该队列的镜像节点。
  • 一个队列的主节点可能是另外一个队列的镜像节点。
  • 所有操作都是主节点完成,然后同步给镜像节点。(如果在主从完成前,主节点已经宕机,可能出现数据丢失)
  • 主节点宕机后,镜像节点会替代成新的主节点。

仲裁队列

仲裁队列是3.8以后才有的新功能,用来代替镜像队列,特征如下:

  • 与镜像队列一样,都是主从模式,支持主从数据同步
  • 主从同步基于Raft协议,保证了强一致性。
@Bean
public Queue quorumQueue() {
    return QueueBuilder
        .durable("quorum.queue")
        .quorum() // 仲裁队列
        .build();
}

高性能设计

数据存储和清理

消息队列选型的基本标准

  1. 功能需求
  2. 非功能需求
    1. 必须开源。出了问题,还有机会自己修改源码来修复该问题。
    2. 较流行,且有一定的社区活跃的。大家都在用,遇到的问题概率较小,在生态方面也会比较好。

作为一款合格的消息队列产品,必须具备的几个特性:

  • 可靠性:消息可靠传递,可靠存储、不丢消息。
  • 高可用:支持集群,确保不会因为某个节点宕机导致服务不可用,也不能丢消息。
  • 高性能:具备足够好的性能,能满足绝大多数场景的性能需求。

消费队列的消费模型

  1. 队列模型:FIFO。
  2. 发布-订阅模型。

RabbitMQ

RabbitMQ用的是队列模型。

image-20240321211731737

如果希望同一份数据被多个消费者消费,可以配置Exchange将消息投递到多个队列中,这样每个队列都存储了完整的消息数据。

变相地实现了发布-订阅模型。

image-20240321220220396

RocketMQ

RocketMQ是标准的“发布-订阅”模型。

一个Broker中会存在多个Topic。

image-20240321220329558

几乎所有的MQ都是“请求 - 确认“机制。

image-20240321220352492

当Broker中的Topic下的Queue收到生产者的消息后,会向生产者发送确认的响应。如果生产者没有收到MQ的确认,此时会重发消息。

对于消费者来说,消费者收到消息并且完成自己的业务逻辑后,会给MQ发送消费成功的确认,MQ收到确认后,认为消息消费成功了,否则它会给消费者重新发送该消息,直到收到消费成功的确认。