消息队列 RabbitMQ

什么是MQ

MQ即MessageQueue 消息队列

最直接的作用 : 将同步的事件驱动改为异步的消息驱动

解耦

Producer和Consumer都只跟中间件进行交互,而不需要互相进行交互。这意味着,在Producer发送消息时,不需要考虑有没有Consumer或多个Consumer。

异步

消息并不是从Producer发送出来后,就立即交由Consumer处理,而是在MQ中间件中暂存下来,等到Consumer启动后。自行去MQ中间件上处理。错开了生产-消费的时间

削峰

有了MQ做消息暂存,当Producer发送消息的速度和Consumer处理消息的速度不一致时,MQ就可以起到削峰填谷的作用

主流MQ

1
2
3
Kafka       吞吐量非常大,性能非常好,技术生态完整       功能单一             分布式日志收集
RabbitMQ 消息可靠性高,功能全面 吞吐量较低,消息挤压影响性能 企业系统内部调用
RocketMQ 高吞吐,高性能,高可用 技术生态相对不完整 几乎全场景

处理路径

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
┌──────────────┐       AMQP Frame       ┌──────────────┐
│ TCP Socket │ ──────────────────▶ │ Connection │
└──────────────┘ └──────────────┘
RabbitMQ 的 Socket 资源句柄:用于接收来自 TCP 层的字节流数据,作为 AMQP 协议消息的输入通道
|

┌────────────┐
│ Channel │
└────────────┘
RabbitMQ 的消息处理通道:将接收到的 AMQP 字节流解析为结构化命令。识别控制字段(如 basic.publish)、
参数(如 exchange、routingKey)和消息体,并调用对应的内部方法进行处理。

basic.publish ▼
─────────────────────────────────────▶ ┌────────────┐
│ Exchange │
└────────────┘
根据 exchange 的类型(fanout、direct、topic、headers)与 routingKey,
匹配绑定规则(binding),将消息分发到一个或多个符合条件的队列。


┌────────────┐
│ Queue │
└────────────┘
按 FIFO 原则缓存等待消费。支持内存与磁盘混合存储、消息确认机制(ACK/NACK)
TTL、死信等处理策略。


消费者收到消息

常用消息场景

WrokQueue

生产者生成的消息分发给多个消费者的消息

1
2
P ---->  Queue  ----> Consumer1
----> Consumer2

Consumer 对每个消息必须应答

Consumer端每消费完要给消息,需要给服务端一个ack应答,这个应答可以是手动应答,也可以是自动应答。如果Consumer一直没有给服务端应答,那么服务端不断将这条消息进行投递,就会不断地消耗系统资源。

RabbitMQ并不完全保证消息安全

关键的message不能因为服务出现问题而被忽略。如果想要保证消息不丢失,在RabbitMQ中,需要同时将队列和消息的durable属性都设置成true。但是durable仍然不能完全保证消息不被丢失。RabbitMQ是定期将缓存写入到磁盘中,所以当出现异常时,缓存中的数据会丢失。

消息如何在多个Consumer之间分发

默认采用了fair dispatch 。即消息轮询,向消费者发送。

Publish/Subscribe (发布/订阅)

Producer 向交换器发布消息,交换器向所有消费者发送消息(指定Fanout队列)

1
2
3
P ----> X ----> amq.gen-RQ6 -> C1
----> amq.gen-As8 -> C2

Routing 基于内容的路由

通过 routingKey 来查询特定队列向其中发送消息

1
channel.baseicPublish(EXCHANGE_NAME, routingKey"debug", props : null, message.getBytes());
1
2
3
4
P ----> error ----> amq.gen-S9b -> C1
----> info ---->
----> warn ----> amq.gen.Ag1 -> C2
----> error ---->

Topics 基于话题的路由

routingKey 作为话题进行转发

1
2
3
4
P ----> X ----> *.orange.* ----> Q1 ----> C1
----> *.*.rabbit
---->Q2 ----> C2
----> lazy.#

Headers 头部路由机制

定制键值对,指定交换器类型为Headers。设置channel绑定的头部信息,通过匹配头部信息转发

Publisher Confirms 发送者消息确认

通过阻塞等待接收发布消息的确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
channel.confirmSelect();

// 单条消息发布
for(int i = 0 ; i< MESSAGE_COUNT;i++){
String body = String.value(i);
channel.basicPublish("",queue,null,body.getBytes());
channel.waitForConfirmsOrDie(5_000);
}

// 批量消息发布

int batchSize = 100 ;
int outstandingMessageCount = 0 ;
long start = System.nonoTime();
for(int i = 0 ; i < MESSAGE_COUNT ; i++ ){
String body = String.valueOf(i);
ch.basicPublish("", queue, null , body.getBytes());
outstandingMessageCount++ ;

if(outstandingMessageCount == batchSize){
ch.waitForConfirmsOrDie(5_000);
outstandingMessageCount = 0 ;
}
}

if(outstandingMessageCount > 0){
ch.waitForConFirmsOrDie(5_000);
}

// 异步确认

channel.addConfirmListener(ConfirmCallback var1 , ConfirmCallback var2);

队列结构

Classic 结构队列

FIFO队列的简单实现。如果持久化置位为true,则将消息存储进磁盘,如果持久化置位为false,则将消息加入缓存中

Quorum 仲裁队列

数据一定会持久化,不能自动删除,提高数据的安全性

不适合的场景

  1. 一些临时使用的队列:比如transient临时队列,exclusive独占队列,或者经常会修改和删除的队列
  2. 对消息低延迟要求高,一致性算法会影响消息的延迟
  3. 对数据安全性要求不高: Quorum队列需要消费者手动通知或者生产者手动确认
  4. 队列消息积压严重

Stream流式队列

一个基于日志模型的新型队列实现,支持 Kafka 式的顺序流、高并发、可回溯消费,同时保留 RabbitMQ 的灵活路由机制。

类似于 Kafka 的 Topic,每个 Stream 是一个 可追加的持久化日志序列

消息写入后按序编号,消费者使用 offset 拉取

Stream 可以配置为多分区,便于水平扩展和并行消费

每个分区就是一个独立的顺序日志

每条消息有全局唯一的 offset

消费者可指定 offset(earliest/latest/指定位置)启动

支持消费组状态保存(需要启用 Broker-side offset tracking)

1
2
3
4
params.put("x-queue-type","stream");
params.put("x-max-length-bytes",20_000_000_000L);
params.put("x-stream-max-segment-size-bytes",100_000_000);
channel.queueDeclare(QUEUE_NAME,true,false,false,params);

死信队列

用于接收其他队列中未能正常消费的消息的队列

1
2
3
4
x-dead-letter-exchange:mirror.dlExchange  对应的死信交换机
x-dead-letter-routing-key: mirror.messageExchange1.messageQueue1 死信交换机routing-key
x-message-ttl: 3000 消息过期时间
durable: true 持久化

何时会产生死信

  1. 消息被消费者确认拒绝,消费者把requeue参数设置为ture,并且在消费后,向RabbitMQ返回拒绝
  2. 消息达到预设的TTL时限还一直没有被消费
  3. 消息由于队列已经达到最长长度限制而被丢掉

Sharding 插件

通过分队列的方式提高消息的吞吐量

image

设置sharding 策略

image

定义交换机

image

最终会绑定3个虚拟队列

在消费端绑定虚拟交换机

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public  class ShardingConsumer {
public static final String QUEUENAME= "sharding_exchange";
public static void main(String[] args) throws IOException, TimeoutException{
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.65.193");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("admin");
factory.setVirtualHost("/mirror");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

channel.queueDeclare(QUEUENAME,false ,false,false,null);

Consumer myconsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties
properties, byte[] body) throws IOException{
String routingKey = envelope.getRoutingKey();
System.out.println("routingKey >" + routingKey);
String contentType = properties.getContentType();
System.out.println("contentType >" + contentType);
long deliveryTag = envelope.getDeliveryTag();
System.out.println("deliveryTag >" + deliveryTag);
System.out.println("content:" + new String(body,"UTF-8"));
}
};
// 需要进行3次接收 队列的分发。
String consumerFlag1 = channel.basicConsume(QUEUENAME,true , myconsumer);
System.out.println("c1:" + consumerFlag1);
String consumerFlag2 = channel.basicConsume(QUEUENAME,true, myconsumer);
System.out.println("c2:" + consumerFlag2);
String consumerFlag3 = channel.basicConsume(QUEUENAME,true, myconsumer);
System.out.println("c3:" + consumerFlag3);
}
}

使用Sharding后,因为只需要指定虚拟的Exchange,并不能确定消息最终会发送到哪条队列。虽然策略是均匀保存测率,但是不能完全保证均匀。

所以适用于消息延迟要求不严格,以及对消费顺序没有任何要求的场景。

另外尽量不要使用碎片队列接收,会导致队列的消息分配会受到影响。


消息队列 RabbitMQ
http://gadoid.io/2025/05/22/消息队列-RabbitMQ/
作者
Codfish
发布于
2025年5月22日
许可协议