RocketMQ

JMS

java消息服务(Java Message Service) Java平台中关于面向消息中间件的接口

JMS 提供者 : 连接面向消息中间件,JMS接口的一个实现

JMS 生产者 :Message Producer 生产消息的服务

JMS 消费者 :Message Consumer 消费消息的服务

JMS 消息 : 数据对象

JMS 队列 : 存储待消费消息的区域

JMS 主题 : 一种支持发送消息给多个订阅者的机制

JMS消息通常由两种类型 : 点对点 ,发布/订阅

基础编程模型

MQ中的一些原生类

ConnectionFactory : 连接工厂 , JMS 用它创建链接

Connection : JMS 客户端到JMS Provider的连接

Session : 一个发送或接收消息的线程

Destination : 消息的目的地

MessageConsumer : MessageProducer 消息消费者,消息生产者

RocketMQ

  • 支持Broker和Consumer端消息过滤
  • 支持发布/订阅模型,和点对点
  • 支持拉pull和推push两种消息模式
  • 单一队列百万消息,亿级消息堆积
  • 支持单master节点,多master节点,多master多slave节点
  • 任意一点都是高可用,水平拓展,Producer,Consumer, 队列都可以分布式
  • 消息失败重试机制,支持定义level的定制消息
  • 底层采用netty
  • 支持分布式事务

概念

  • Producer 消息生产者
  • Producer Group 消息生产者组
  • Consumer 消费者
  • Consumer Group 消费同类消息的多个示例
  • Tag : 标签,子主题(二级分类)对topic的进一步细化,用于区分同一个主题下的不同业务消息
  • Topic主题, 逻辑管理单位 一个topic下可以有多个queue
  • Message 消息 每个message 必须指定一个topic
  • Broker MQ程序,接收生成的消息,提供给消费者消费的程序
  • Name Server 给生成和消费者提供路由信息,提供轻量级的路由发现,路由,元数据信息。可以多个部署,互相独立
  • Offset 偏移量
  • commit log :消息存储会写在Commit log文件里

工程实现

普通消息发送

同步发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public class SyncProducer{
public static void main(String[] args ) throws Exception{
DefaultMQProducer producer = new DefaultMQproducer("group_test");

producer.setNamesrvAddr("127.0.0.1:9876");
//producer.setSendLatencyFaultEnable(true);
producer.start();
for(int i = 0 ;i < 10 ; i++){
Message msg = new Message("TopicTest","TagA",("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
};
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n",sendResult);
}
producer.shutdown();
}
}

异步发送

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class AsyncProducer {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("group_test");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for(int i = 0 ; i < 10 ; i++){
final int index = i ;
Message msg = new Message("TopicTest","TagA","OrderID888");
producer.send(msg, new SendCallback(){
@Override
public void onSuccess(SendResult sendResult){
System.out.printf("%s%n",sendResult);
}
@Override
public void onException(Throwable e){
System.out.printf("%-10d Exception %s %n ",index,e);
e.printStackTrace();
}
});
}
Thread.sleep(10000);
producer.shutdown();
}
}

单向发送

1
2
3
4
5
6
7
8
9
10
11
12
13
public class OnewayProducer{
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("group_test");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
for(int i = 0 ; i < 10 ; i++){
Message msg = new Message("TopicTest","TagA",("Hello RocketMQ" + i).getBytes(RemotingHelper.DEFAULT_CHARSET)
);
producer.sendOneWay(msg);
}
producer.shutdown();
}
}

普通消息的消费模式

1
2
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置集群消费模式,轮询投递给每个消费者
1
2
consumer.setMessageModel(MessageModel.Boardcasting);
// 设置广播消费模式, 每个消费者都获取同样的消息

顺序消息

顺序消息

同一个queue中会顺序消费

部分顺序消息

向同一个序列中某个流程的执行过程是有序的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 生产方
public class ProducerInOrder {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("OrderProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
List<Order> orderList = new ProducerInOrder().buildOrders();
for(int i = 0 ; i < orderList.size();i++){
String body = orderList.get(i).toString();
Message msg = new Message("PartOrder”,null,"KEY"+i,body.getBytes());
SendResult sendResult = producer.send(msg,new MessageQueueSelector(){
@Override
public MessageQueue select(List<MessageQueue>mqs, Message msg , Object arg){
@Override
public MessageQueue select(List<MessageQueue>mqs , Message msg , Object arg){
Long id = (Long) arg;
long index = id % mqs.size();
return mqs.get((int)index);
}
}, orderList.get(i).getOrderId());
System.out.println(String.format("SendResult status %s ,queueId: %d , body: %s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
}
}

// 消费方
public class ConsumerInOrder {
public static void main(String[] args ) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPsuhConsumer("OrderConsumer2");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUM_FROM_LAST_OFFSET);
consumer.subscribe("PartOrder","*");
consumer.registerMessageListener(new MessageListenerOrderly(){
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context){
context.setAutoCommit(true);
for(MessageExt msg : msgs ){
System.out.println("consumeThread=" + Thread.currentThread().getName()
+",queueId=" + msg.getQueueId() + ",content:" + new String(msg.getbody()));
}
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
} catch (Exception e){
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
return ConsumeOrderlyStatus.SUCCESS;
}

})

}

}

延迟消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 生产者
public class ScheduledMessageProducer{
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("ScheduledProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
int totalMessagesToSend = 10 ;
for(int i = 0 ; i < totalMessagesToSend ; i++){
Message message = new Message("ScheduledTopic",("Hello scheduled message"+i).getBytes());
message.setDelayTimeLevel(4);
producer.send(message);
}
producer.shutdown();
}
}

批量消息

一次性发送多条消息,但是注意总大小不能超过4M,否则会阻塞消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public class BatchMessageProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("BatchProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "Tag", "OrderID001","HelloWorld1".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID002","HelloWorld2".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID003","HelloWorld3".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID004","HelloWorld4".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID005","HelloWorld5".getBytes()));
messages.add(new Message(topic, "Tag", "OrderID006","HelloWorld6".getBytes()));
try {
producer.send(messages);
} catch (Exception e){
producer.shutdown();
e.printStactkTrace();
}
producer.shutdown();
}
}

过滤消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 生产端
public class TagFilterProducer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producter = new DefaultMQProducer("TagFilterProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[] {"TagA","TagB","TagC"};
for(int i = 0 ; i < 3 ; i++){
Message msg = new Message("TagFilterTest");
tags[i % tags.length],
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult sendResult = producer.send(msg);
System.out.printf("%s%n",sendResult);
}
producer.shutdown();
}
}

// 消费端
public class TagFilterConsumer {
public static void main(String[] args) throws Exception{
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("TagFilterConsumer");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("TagFilterTest","TagA||TagB");
consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumerMessage(List<MessageExt>msgs ,
ConsumeConcurrentlyContext context){
try {
for(MessageExt msg : msgs){
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"UTF-8");
String msgPro = msg.getProperty("a");
String tags = msg.getTags();
System.out.println("收到消息"+ topic + "tags" + tags + "a:" + msgPro + ",msg:" +msgBody);
}
} catch (Exception e){
e.printStackTrace();
}
}
})
}
}

核心方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
// 生产
public class ProducerDetails {
public static void main(String[] args) throws Exception{
DefaultMQProducer producer = new DefaultMQProducer("producer_details");
// 生产者所属组
producer.setDefaultTopicQueuenums(num);
// 默认主题在每一个Brokcer队列数量
producer.setSendMsgTimeout(1000*3);
// 发送消息默认超时时间
producer.setCompressMsgBodyOverHowmuch(1024*4);
// 超过限值启动压缩功能
producer.setRetryTimesWhenSendFailed(2);
// 同步方式发送消息重试次数
producer.setRetryTimesWhenSendAsyncFailed(2);
// 异步方式发送消息重试次数
producer.setRetryAnotherBrokerWhenNotStoreOK(false);
// 消息重试时选择另一个Broker
producer.setMaxMessageSize(1024 * 1024 * 4);
// 允许发送的最大长度信息
producer.setNamesrvAddr("106.55.246.66.9876");
// nameserver 地址
producer.start();
}
}

public class ConsumerDetails {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("king");
// 消费者组
consumer.setNamesrvAddr("106.55.246.66:9876");
// 设置nameserver
consumer.setMessageModel(MessageModel.CLUSTERING);
// 设置消费模式
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
// 设置消费开始偏移量
consumer.setConsumeThreadMin(20);
// 消费者最小线程数量
consumer.setConsumeThreadMax(20);
// 消费者最大线程数量
consumer.setPullInterval(0);
// 推模式下任务时间间隔
consumer.setPullBatchSize(32);
// 推模式下拉去的条数
consumer.setMaxReconsumeTimes(-1);
// 消息重试次数
consumer.setConsumeTimeout(15);

Set<MessageQueue> MessageQueueSet = consumer.fetchSubscribeMessageQueues("TopicTest");
Iterator iterator = MessageQueueSet.iterator();
while(iterator.hasNext()){
MessageQueue MessageQueue = (MessageQueue)iterator.next();
System.out.println(MessageQueue.getQueueId());
}
consumer.subscribe("TopicTest","*");
consumer.subscribe("TopicTest",MessageSelector.bySql());
consumer.subscribe("TopicTest",MessageSelector.byTag());
consumer.unsubscribe("TopicTest");
// 筛选订阅

consumer.registerMessageListner(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext Context);
try {
for(MessageExt msg : msgs){
String topic = msg.getTopic();
String msgBody = new String(msg.getBody(),"utf-8");
String tags = msg.getTags();
System.out.println("收到消息:" + "topic : " + topic + ", tags :" + tags + “,msg:” + msgBody );
}
}catch (Exception e){
e.printStackTrace();
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 并发监听
consumer.registerMessageListener(new MessageListenerOrderly(){
Random random = new Random();
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt>msgs,ConsumeOrderlyContext Context){
context.setAutoCommit(true);
for(MessageExt msg : msgs){
System.out.println("consumeThread=" + Thread.currentThread().getName()+"queueId="
+ msg.getQueueId() + ",content");
}
try {
TimeUnit.MILLISECONDS.sleep(random.nextInt(300));
}catch (Exception e){
e.printStackTrace();
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}return ConsumeOrderlyStatus.SUCCESS;
}
});
// 顺序监听
consumer.start();
}
}

集群部署

单master 模式

多master 模式

多master 多slave模式(同步)

多master 多slave 模式(异步)

多master 多slave + 持久化(硬盘写入)

存储设计

image

可以看到Rocket 分为4个块

  1. Message 消息生成者,向对应的Topic 发布消息
  2. Topic 主题,消息携带的标志信息,还可以使用tags做进一步细分
  3. Queue/Offset Queue实际上的消息分发队列,Offset记录消息的读取位置
  4. Group 消费者组,同一消费者组共享消费策略

顺序和重复问题

RocketMQ 没有提供防止乱序和重复的机制。因为

  1. 乱序应用实际大量存在
  2. 队列无序并不意味着消息无序

重复问题

  1. 消息端处理消息的业务逻辑保持幂等性
  2. 确保每一条消息都有唯一的编号且保证消息处理成功与去重表的日志同时出现。

存储结构

RocketMQ消息的存储是由ConsumeQueue 和 CommitLog 配合完成的,消息真正的物理存储文件是CommitLog,ConsumeQueue是消息的逻辑队列。类似数据库的索引文件。存储的是指向物理存储的地址。每个Topic下的每个Message Queue都有一个对应的ComsumeQueue文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Commitlog 
-------------------------------------------------
消息长度 | 消息其他信息 | 消息长度 | 消息其他信息

// 每个CommitLog文件的大小为1G,写完之后记录偏移量为1G的位置继续写入

ComsumeQueue
------------------------------------------------
commitLogOffset | size | tag hashcode
// ComsumeQueue 是消息的逻辑队列,类似数据库索引,存储的是指向物理存储的地址
// 文件默认存储在 ${ROCKETMQ_HOME}/store/consumequeue/{topic}/{queueId}/

Index
index 存储了索引文件,用于加快Key查询时的 查询速度 结构为Hash槽与Hash冲突的链表结构

Config

存储Topic 和 Consumer 等相关信息
topics.json : topic 配置属性
subscriptionGroup.json : 消息消费组配置信息
delayOffset.json : 延迟消息队列拉取进度
consumerOffset.json : 集群消费模式消息进度
consumerFilter.json : 主题消息过滤信息

文件删除

如果非当前写文件在一定时间间隔内没有再次被更新,则认为是过期文件。RocketMQ不会关注这个文件中的消息是否全部被消费,默认过期时间为42小时

1
2
3
4
5
6
7
fileReservedTime   未更新时间
deletePhysicFileInterval 删除物理文件的时间间隔
destroyMapedFileIntervalForcibly 删除时发现还有线程在引用,会阻止删除操作
deleteWhen 定时删除文件
DiskSpcaeCleanForciblyRatio 磁盘控件默认水位
diskSpaceWarningLevelRatio 使用率大于这个值,阻止写入
diskMaxUsedSpaceRatio 小于这个值,表示使用正常

零拷贝

1 可以减少数据拷贝和共享总线操作的次数

2 减少用户空间到内核空间切换的上下文开销

传统机制

1 磁盘 读取到 内核缓冲区

2 内核缓冲区拷贝到用户空间

3 用户空间拷贝到socket缓冲区

4 socket缓冲区拷贝到socket网卡

mmap

将用户文件直接映射到用户空间内容

分布式事务

事务的执行需要保持ACID 原则 一致性,持久性,隔离性,原子性。

而当A系统的事务需要B系统参与时,A系统无法完全控制在B系统上的执行过程(比如断网)

这时就会出现分布式事务问题

异步化

通过MQ,可以为事务当前的处理流程标记一个“状态”。这样对于A系统来说,事务的执行过程仍然是可控的。

但是响应过程会存在如下问题 : 当A系统生产了信息交给MQ发送给B系统后,何时发出对B给出结果的响应?

  1. 先执行,后传递消息 ———- 出现通信问题,B系统执行失败。导致两端不一致
  2. 先传递消息,后执行 ———- 出现通信问题,B系统执行。但是消息未发回导致两端不一致。

半事务消息/事务回查

  1. 向MQ存储一个commitlog,不创建consumerqueue
  2. 后续MQ 进行事务回查,如果成功了,创建consumerqueue加入队列,如果失败了则回滚。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public class TransactionProducer{
public static void main(String[] args) throws Exception{
TransactionListener transactionListener = new TransactionListenerImpl();
TranscationMQProducer producer = new TransactionMQProducer("TransactionProducer");
producer.setNamesrvAddr("127.0.0.1:9876");
ExecutorService executorService = new ThreadPoolExecutor(2,5,100){;
@Override
public Thread newThread(Runnable r){
Thread thread = new Thread(r);
thread.setName("client-transcation-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
producer.setTransactionListener(transactionListener);
producer.start();
try {
Message msg = new Message("TransactionTopic", null ,("支付").getBytes(RemotingHelper.DEFAULT_CHARSET);)
SendResult sendResult = producer.sendMessageInTransaction(msg,null);
SimpleDateFormat df = new SimpleDataFormat("yyyy-MM-dd HH:mm:ss");
System.out.println(sendResult.getSendStatus()+"-"+df.format(new Date()));
} catch (MQClientException | UnsupportedEncodingException e){
e.printStatckTrace();
}
for(int i = 0 ; i < 1000 ; i ++){
Thread.sleep(1000);

}
}

// 消费者
public class TransactionConsumer{
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Transaction");
consumer.setNamesrvAddr("127.0.0.1:9876");
consumer.subscribe("Transaction","*");
consumer.setMessageModel("MessageModel.CLUSTERING");
consumer.registerMessageListener(new MessageListenerConcurrently(){
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs
, ConsumeConcurrentlyContext context);
try{
for(MessageExt msg : msgs){
System.out.println("update B ... where transactionId" + msg.getTransactionId());
System.out.println("commit :" + msg.getTransactionId());
System.out.println("执行本地事务成功");
} catch(Exception e){
e.printStackTrace();
System.out.println("执行本地事务失败,重试消费,尽量确认B处理成功");
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});


}

}

队列选择策略

Topic 创建在多Broker 中

  选择队列策略      —由nameserver 提供路由信息

重试 — 失败后进行重试

规避策略 — 失败超过最大限值,会规避该broker

故障延迟

记录Broker 发送时长,计算规避时间,选择可用broker

即当 生产者发送时延过久,会设置一个规避时间,规避时间段内不会选择该broker发送消息

默认消息发送规则

轮询

消费者

  1. 平均消费 6个队列 3个消费者 每个消费者消费2个队列
  2. 轮询 消费者按照顺序获取消息进行消费

RocketMQ
http://gadoid.io/2025/05/24/RocketMQ/
作者
Codfish
发布于
2025年5月24日
许可协议