什么是MQ
MQ即MessageQueue 消息队列
最直接的作用 : 将同步的事件驱动改为异步的消息驱动
解耦
Producer和Consumer都只跟中间件进行交互,而不需要互相进行交互。这意味着,在Producer发送消息时,不需要考虑有没有Consumer或多个Consumer。
异步
消息并不是从Producer发送出来后,就立即交由Consumer处理,而是在MQ中间件中暂存下来,等到Consumer启动后。自行去MQ中间件上处理。错开了生产-消费的时间
削峰
有了MQ做消息暂存,当Producer发送消息的速度和Consumer处理消息的速度不一致时,MQ就可以起到削峰填谷的作用
主流MQ
1 2 3
| Kafka 吞吐量非常大,性能非常好,技术生态完整 功能单一 分布式日志收集 RabbitMQ 消息可靠性高,功能全面 吞吐量较低,消息挤压影响性能 企业系统内部调用 RocketMQ 高吞吐,高性能,高可用 技术生态相对不完整 几乎全场景
|
处理路径
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| ┌──────────────┐ AMQP Frame ┌──────────────┐ │ TCP Socket │ ──────────────────▶ │ Connection │ └──────────────┘ └──────────────┘ RabbitMQ 的 Socket 资源句柄:用于接收来自 TCP 层的字节流数据,作为 AMQP 协议消息的输入通道 | ▼ ┌────────────┐ │ Channel │ └────────────┘ RabbitMQ 的消息处理通道:将接收到的 AMQP 字节流解析为结构化命令。识别控制字段(如 basic.publish)、 参数(如 exchange、routingKey)和消息体,并调用对应的内部方法进行处理。 │ basic.publish ▼ ─────────────────────────────────────▶ ┌────────────┐ │ Exchange │ └────────────┘ 根据 exchange 的类型(fanout、direct、topic、headers)与 routingKey, 匹配绑定规则(binding),将消息分发到一个或多个符合条件的队列。 │ ▼ ┌────────────┐ │ Queue │ └────────────┘ 按 FIFO 原则缓存等待消费。支持内存与磁盘混合存储、消息确认机制(ACK/NACK) TTL、死信等处理策略。 │ ▼ 消费者收到消息
|
常用消息场景
WrokQueue
生产者生成的消息分发给多个消费者的消息
Consumer 对每个消息必须应答
Consumer端每消费完要给消息,需要给服务端一个ack应答,这个应答可以是手动应答,也可以是自动应答。如果Consumer一直没有给服务端应答,那么服务端不断将这条消息进行投递,就会不断地消耗系统资源。
RabbitMQ并不完全保证消息安全
关键的message不能因为服务出现问题而被忽略。如果想要保证消息不丢失,在RabbitMQ中,需要同时将队列和消息的durable属性都设置成true。但是durable仍然不能完全保证消息不被丢失。RabbitMQ是定期将缓存写入到磁盘中,所以当出现异常时,缓存中的数据会丢失。
消息如何在多个Consumer之间分发
默认采用了fair dispatch 。即消息轮询,向消费者发送。
Publish/Subscribe (发布/订阅)
Producer 向交换器发布消息,交换器向所有消费者发送消息(指定Fanout队列)
1 2 3
| P ----> X ----> amq.gen-RQ6 -> C1 ----> amq.gen-As8 -> C2
|
Routing 基于内容的路由
通过 routingKey 来查询特定队列向其中发送消息
1
| channel.baseicPublish(EXCHANGE_NAME, routingKey"debug", props : null, message.getBytes());
|
1 2 3 4
| P ----> error ----> amq.gen-S9b -> C1 ----> info ----> ----> warn ----> amq.gen.Ag1 -> C2 ----> error ---->
|
Topics 基于话题的路由
routingKey 作为话题进行转发
1 2 3 4
| P ----> X ----> *.orange.* ----> Q1 ----> C1 ----> *.*.rabbit ---->Q2 ----> C2 ----> lazy.
|
定制键值对,指定交换器类型为Headers。设置channel绑定的头部信息,通过匹配头部信息转发
Publisher Confirms 发送者消息确认
通过阻塞等待接收发布消息的确认
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| channel.confirmSelect();
for(int i = 0 ; i< MESSAGE_COUNT;i++){ String body = String.value(i); channel.basicPublish("",queue,null,body.getBytes()); channel.waitForConfirmsOrDie(5_000); }
int batchSize = 100 ; int outstandingMessageCount = 0 ; long start = System.nonoTime(); for(int i = 0 ; i < MESSAGE_COUNT ; i++ ){ String body = String.valueOf(i); ch.basicPublish("", queue, null , body.getBytes()); outstandingMessageCount++ ;
if(outstandingMessageCount == batchSize){ ch.waitForConfirmsOrDie(5_000); outstandingMessageCount = 0 ; } }
if(outstandingMessageCount > 0){ ch.waitForConFirmsOrDie(5_000); }
channel.addConfirmListener(ConfirmCallback var1 , ConfirmCallback var2);
|
队列结构
Classic 结构队列
FIFO队列的简单实现。如果持久化置位为true,则将消息存储进磁盘,如果持久化置位为false,则将消息加入缓存中
Quorum 仲裁队列
数据一定会持久化,不能自动删除,提高数据的安全性
不适合的场景
- 一些临时使用的队列:比如transient临时队列,exclusive独占队列,或者经常会修改和删除的队列
- 对消息低延迟要求高,一致性算法会影响消息的延迟
- 对数据安全性要求不高: Quorum队列需要消费者手动通知或者生产者手动确认
- 队列消息积压严重
Stream流式队列
一个基于日志模型的新型队列实现,支持 Kafka 式的顺序流、高并发、可回溯消费,同时保留 RabbitMQ 的灵活路由机制。
类似于 Kafka 的 Topic,每个 Stream 是一个 可追加的持久化日志序列
消息写入后按序编号,消费者使用 offset 拉取
Stream 可以配置为多分区,便于水平扩展和并行消费
每个分区就是一个独立的顺序日志
每条消息有全局唯一的 offset
消费者可指定 offset(earliest/latest/指定位置)启动
支持消费组状态保存(需要启用 Broker-side offset tracking)
1 2 3 4
| params.put("x-queue-type","stream"); params.put("x-max-length-bytes",20_000_000_000L); params.put("x-stream-max-segment-size-bytes",100_000_000); channel.queueDeclare(QUEUE_NAME,true,false,false,params);
|
死信队列
用于接收其他队列中未能正常消费的消息的队列
1 2 3 4
| x-dead-letter-exchange:mirror.dlExchange 对应的死信交换机 x-dead-letter-routing-key: mirror.messageExchange1.messageQueue1 死信交换机routing-key x-message-ttl: 3000 消息过期时间 durable: true 持久化
|
何时会产生死信
- 消息被消费者确认拒绝,消费者把requeue参数设置为ture,并且在消费后,向RabbitMQ返回拒绝
- 消息达到预设的TTL时限还一直没有被消费
- 消息由于队列已经达到最长长度限制而被丢掉
Sharding 插件
通过分队列的方式提高消息的吞吐量

设置sharding 策略

定义交换机

最终会绑定3个虚拟队列
在消费端绑定虚拟交换机
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| public class ShardingConsumer { public static final String QUEUENAME= "sharding_exchange"; public static void main(String[] args) throws IOException, TimeoutException{ ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.65.193"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); factory.setVirtualHost("/mirror"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUENAME,false ,false,false,null); Consumer myconsumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException{ String routingKey = envelope.getRoutingKey(); System.out.println("routingKey >" + routingKey); String contentType = properties.getContentType(); System.out.println("contentType >" + contentType); long deliveryTag = envelope.getDeliveryTag(); System.out.println("deliveryTag >" + deliveryTag); System.out.println("content:" + new String(body,"UTF-8")); } }; String consumerFlag1 = channel.basicConsume(QUEUENAME,true , myconsumer); System.out.println("c1:" + consumerFlag1); String consumerFlag2 = channel.basicConsume(QUEUENAME,true, myconsumer); System.out.println("c2:" + consumerFlag2); String consumerFlag3 = channel.basicConsume(QUEUENAME,true, myconsumer); System.out.println("c3:" + consumerFlag3); } }
|
使用Sharding后,因为只需要指定虚拟的Exchange,并不能确定消息最终会发送到哪条队列。虽然策略是均匀保存测率,但是不能完全保证均匀。
所以适用于消息延迟要求不严格,以及对消费顺序没有任何要求的场景。
另外尽量不要使用碎片队列接收,会导致队列的消息分配会受到影响。