From fd48de83b5df7f8c0be678f70e30689edd86c430 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=96=AF=E7=8B=82=E7=9A=84=E7=8B=AE=E5=AD=90Li?= <15040126243@163.com> Date: Tue, 4 Jun 2024 14:50:54 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E4=BC=98=E5=8C=96=20=E5=AE=8C?= =?UTF-8?q?=E5=96=84=20rocketmq=20=E7=9B=B8=E5=85=B3=E6=BC=94=E7=A4=BA?= =?UTF-8?q?=E6=A1=88=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ruoyi-example/ruoyi-test-mq/README.md | 13 +++++------- .../stream/consumer/NormalRocketConsumer.java | 11 ++++++---- .../consumer/TransactionRocketConsumer.java | 3 ++- .../stream/producer/NormalRocketProducer.java | 21 ++++++++++++++++--- .../src/main/resources/application.yml | 8 +++---- 5 files changed, 36 insertions(+), 20 deletions(-) diff --git a/ruoyi-example/ruoyi-test-mq/README.md b/ruoyi-example/ruoyi-test-mq/README.md index d560b0b9..fd446fb6 100644 --- a/ruoyi-example/ruoyi-test-mq/README.md +++ b/ruoyi-example/ruoyi-test-mq/README.md @@ -2,13 +2,10 @@ ## 模块说明 -1. rabbit: 普通消息、延迟队列 -2. rocket:普通消息、事务消息 +1. rabbitmq: 普通消息、延迟队列 +2. rocketmq:普通消息、事务消息、延迟消息 3. kafka:普通消息、stream流的使用 -后续可实现的: - -1. rocket 顺序、异步、延时等 ## 使用方式 @@ -23,7 +20,7 @@ sh mqadmin updateTopic -n -t -c ``` ```shell -bin/mqadmin updatetopic -n localhost:9876 -t TestTopic -c DefaultCluster +bin/mqadmin updatetopic -n localhost:9876 -t test-topic -c DefaultCluster ``` 创建事务消息的topic @@ -33,7 +30,7 @@ sh mqadmin updateTopic -n -t -c ``` ```shell -bin/mqadmin updatetopic -n localhost:9876 -t transaction_topic -c DefaultCluster -a +message.type=TRANSACTION +bin/mqadmin updatetopic -n localhost:9876 -t transaction-topic -c DefaultCluster -a +message.type=TRANSACTION ``` kafka: @@ -43,5 +40,5 @@ kafka-topics.sh --create --topic --bootstrap-server - ``` ```shell -kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 +kafka-topics.sh --create --topic test-topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 ``` diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/NormalRocketConsumer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/NormalRocketConsumer.java index 58ad57b9..8900d0e3 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/NormalRocketConsumer.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/NormalRocketConsumer.java @@ -1,6 +1,7 @@ package org.dromara.stream.consumer; import lombok.extern.slf4j.Slf4j; +import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; @@ -11,10 +12,12 @@ import org.springframework.stereotype.Component; **/ @Slf4j @Component -@RocketMQMessageListener(topic = "TestTopic", consumerGroup = "springboot-mq-consumer-1") -public class NormalRocketConsumer implements RocketMQListener { +@RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-consumer-group") +public class NormalRocketConsumer implements RocketMQListener { + @Override - public void onMessage(String message) { - log.info("【消费者】接收消息:{}" ,message); + public void onMessage(MessageExt ext) { + log.info("【消费者】接收消息:消息体 => {}, tag => {}", new String(ext.getBody()), ext.getTags()); } + } diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/TransactionRocketConsumer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/TransactionRocketConsumer.java index 1dd758e1..3c56d8db 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/TransactionRocketConsumer.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/TransactionRocketConsumer.java @@ -11,11 +11,12 @@ import org.springframework.stereotype.Component; **/ @Slf4j @Component -@RocketMQMessageListener(consumerGroup = "transaction-group", topic = "transaction_topic") +@RocketMQMessageListener(topic = "transaction-topic", consumerGroup = "transaction-group") public class TransactionRocketConsumer implements RocketMQListener { @Override public void onMessage(String message) { log.info("【消费者】===>接收事务消息:{}",message); } + } diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/NormalRocketProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/NormalRocketProducer.java index 27583533..e2979329 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/NormalRocketProducer.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/NormalRocketProducer.java @@ -1,9 +1,10 @@ package org.dromara.stream.producer; import lombok.extern.slf4j.Slf4j; -import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.RocketMQHeaders; import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @@ -19,7 +20,21 @@ public class NormalRocketProducer { private RocketMQTemplate rocketMQTemplate; public void sendMessage() { - SendResult sendResult = rocketMQTemplate.syncSend("TestTopic", MessageBuilder.withPayload("hello world test").build()); - log.info("发送普通同步消息-msg,syncSendMessage===>{}", sendResult); + // 发送普通消息 + // rocketMQTemplate.convertAndSend("test-topic", "test"); + + // 发送带tag的消息 + Message message = MessageBuilder.withPayload("test").setHeader(RocketMQHeaders.TAGS, "test-tag").build(); + rocketMQTemplate.send("test-topic", message); + + // 延迟消息 + // RocketMQ预定义了一些延迟等级,每个等级对应不同的延迟时间范围。这些等级从1到18,分别对应1s、5s、10s、30s、1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、1h、2h的延迟时间。 + org.apache.rocketmq.common.message.Message msg = new org.apache.rocketmq.common.message.Message(); + msg.setDelayTimeLevel(3); + try { + rocketMQTemplate.getProducer().send(msg); + } catch (Exception e) { + e.printStackTrace(); + } } } diff --git a/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml index 1183287e..df7e9f2b 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml +++ b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml @@ -36,10 +36,10 @@ spring: --- # rocketmq 配置 rocketmq: - name-server: localhost:9876 - producer: - # 生产者组 - group: dist-test + name-server: localhost:9876 + producer: + # 生产者组 + group: dist-test --- # nacos 配置 spring: