diff --git a/config/nacos/ruoyi-gateway.yml b/config/nacos/ruoyi-gateway.yml index 4c449977..b70b6d58 100644 --- a/config/nacos/ruoyi-gateway.yml +++ b/config/nacos/ruoyi-gateway.yml @@ -73,10 +73,10 @@ spring: filters: - StripPrefix=1 # MQ演示服务 - - id: ruoyi-stream-mq - uri: lb://ruoyi-stream-mq + - id: ruoyi-test-mq + uri: lb://ruoyi-test-mq predicates: - - Path=/stream-mq/** + - Path=/test-mq/** filters: - StripPrefix=1 diff --git a/pom.xml b/pom.xml index 9772dea0..a611c7a9 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,6 @@ 1.3.6 0.2.0 1.16.6 - 2.3.0 2.7.0 @@ -58,8 +57,10 @@ 3.2.1 - + 7.0.0 + + 2.3.0 3.11.0 @@ -389,7 +390,7 @@ org.apache.rocketmq rocketmq-spring-boot-starter - ${rocketmq-version} + ${rocketmq.version} diff --git a/ruoyi-example/ruoyi-test-mq/README.md b/ruoyi-example/ruoyi-test-mq/README.md index 9fba827c..b44f99ad 100644 --- a/ruoyi-example/ruoyi-test-mq/README.md +++ b/ruoyi-example/ruoyi-test-mq/README.md @@ -2,12 +2,6 @@ ## 模块说明 -该模块基于需求【[将spring-cloud-stream改为普通spring的mq依赖用法】下修改编写; - -原模块缺点:功能复杂学习成本高 大部分用户用不明白 功能封闭 特性无法使用 项目中基本不会有切换mq的事情发生; - -现模块:集成基础的rabbit、rocketmq、kafka等主流的中间件,功能包含 - 1. rabbit: 普通消息、延迟队列 2. rocket:普通消息、事务消息 3. kafka:普通消息 @@ -17,50 +11,6 @@ 1. kafka stream流的使用 2. rocket 顺序、异步、延时等 -## 项目目录 - -```xml -├─src -│ └─main -│ ├─java -│ │ └─org -│ │ └─dromara -│ │ └─stream -│ │ │ RuoYiTestMqApplication.java -│ │ │ -│ │ ├─config -│ │ │ RabbitConfig.java 普通消息配置类 -│ │ │ RabbitTtlQueueConfig.java 延迟队列配置类 -│ │ │ -│ │ ├─controller 测试类 -│ │ │ PushMessageController.java -│ │ │ -│ │ └─mq -│ │ ├─consumer -│ │ │ ├─kafkaMq -│ │ │ │ KafkaNormalConsumer.java -│ │ │ ├─rabbit -│ │ │ │ ConsumerListener.java -│ │ │ └─rocketmq -│ │ │ NormalRocketConsumer.java -│ │ │ TransactionRocketConsumer.java -│ │ ├─listener -│ │ │ TranscationRocketListener.java -│ │ └─producer -│ │ ├─kafkaMq -│ │ │ KafkaNormalProducer.java -│ │ ├─rabbitMq -│ │ │ DelayRabbitProducer.java -│ │ │ NormalRabbitProducer.java -│ │ └─rocketMq -│ │ NormalRocketProducer.java -│ │ TransactionRocketProducer.java -│ │ -│ └─resources -│ application.yml IP:Host根据实际情况替换 -│ logback-plus.xml -``` - ## 使用方式 rocketmq: @@ -96,7 +46,3 @@ kafka-topics.sh --create --topic --bootstrap-server - ```shell kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1 ``` - -## 验证方式 - -可通过`PushMessageController`实现`Restful`进行测试; \ No newline at end of file diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitConfig.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitConfig.java index 3995599c..ba18ccd2 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitConfig.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitConfig.java @@ -16,6 +16,7 @@ public class RabbitConfig { public static final String EXCHANGE_NAME = "demo-exchange"; public static final String QUEUE_NAME = "demo-queue"; public static final String ROUTING_KEY = "demo.routing.key"; + /** * 创建交换机 * ExchangeBuilder有四种交换机模式 @@ -29,6 +30,7 @@ public class RabbitConfig { public TopicExchange exchange() { return new TopicExchange(EXCHANGE_NAME); } + /** * 创建队列 * durable 队列是否持久化 队列调用此方法就是持久化 可查看方法的源码 @@ -38,6 +40,7 @@ public class RabbitConfig { public Queue queue() { return new Queue(QUEUE_NAME, false); } + /** * 绑定交换机和队列 * bing 方法参数可以是队列和交换机 diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitTtlQueueConfig.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitTtlQueueConfig.java index 9fc9693a..9416e1da 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitTtlQueueConfig.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/RabbitTtlQueueConfig.java @@ -1,13 +1,15 @@ package org.dromara.stream.config; import org.springframework.amqp.core.*; -import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; +import java.util.Map; + /** * RabbitTTL队列 + * * @author xbhog */ @Configuration @@ -26,44 +28,58 @@ public class RabbitTtlQueueConfig { public static final String DEAD_LETTER_QUEUE = "dlx-queue"; // 死信路由键名称 public static final String DEAD_LETTER_ROUTING_KEY = "dlx.routing.key"; - // 延迟消息的默认 TTL(毫秒) - @Value("${rabbitmq.delay.ttl:5000}") - private long messageTTL; - // 声明延迟队列 + /** + * 声明延迟队列 + */ @Bean public Queue delayQueue() { return QueueBuilder.durable(DELAY_QUEUE_NAME) - .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE) - .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY) - .withArgument("x-message-ttl", messageTTL) + .deadLetterExchange(DEAD_LETTER_EXCHANGE) + .deadLetterRoutingKey(DEAD_LETTER_ROUTING_KEY) .build(); } - // 声明延迟交换机 + + /** + * 声明延迟交换机 + */ @Bean - public TopicExchange delayExchange() { - return new TopicExchange(DELAY_EXCHANGE_NAME); + public CustomExchange delayExchange() { + return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message", + true, false, Map.of("x-delayed-type", "direct")); } - // 将延迟队列绑定到延迟交换机 + + /** + * 将延迟队列绑定到延迟交换机 + */ @Bean - public Binding delayBinding(Queue delayQueue, TopicExchange delayExchange) { - return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY); + public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) { + return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs(); } - // 声明死信队列 + + /** + * 声明死信队列 + */ @Bean public Queue deadLetterQueue() { return new Queue(DEAD_LETTER_QUEUE); } - // 声明死信交换机 + + /** + * 声明死信交换机 + */ @Bean - public TopicExchange deadLetterExchange() { - return new TopicExchange(DEAD_LETTER_EXCHANGE); + public DirectExchange deadLetterExchange() { + return new DirectExchange(DEAD_LETTER_EXCHANGE); } - // 将死信队列绑定到死信交换机 + /** + * 将死信队列绑定到死信交换机 + */ @Bean - public Binding deadLetterBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange) { + public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) { return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY); } + } diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rabbit/ConsumerListener.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/ConsumerListener.java similarity index 96% rename from ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rabbit/ConsumerListener.java rename to ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/ConsumerListener.java index 9d05927a..b66453ce 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rabbit/ConsumerListener.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/ConsumerListener.java @@ -1,4 +1,4 @@ -package org.dromara.stream.mq.consumer.rabbit; +package org.dromara.stream.consumer; import lombok.extern.slf4j.Slf4j; import org.dromara.stream.config.RabbitConfig; diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/kafkaMq/KafkaNormalConsumer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/KafkaNormalConsumer.java similarity index 92% rename from ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/kafkaMq/KafkaNormalConsumer.java rename to ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/KafkaNormalConsumer.java index 11721153..c964c265 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/kafkaMq/KafkaNormalConsumer.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/KafkaNormalConsumer.java @@ -1,4 +1,4 @@ -package org.dromara.stream.mq.consumer.kafkaMq; +package org.dromara.stream.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/NormalRocketConsumer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/NormalRocketConsumer.java similarity index 92% rename from ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/NormalRocketConsumer.java rename to ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/NormalRocketConsumer.java index 5a95f9c2..58ad57b9 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/NormalRocketConsumer.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/NormalRocketConsumer.java @@ -1,4 +1,4 @@ -package org.dromara.stream.mq.consumer.rocketmq; +package org.dromara.stream.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/TransactionRocketConsumer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/TransactionRocketConsumer.java similarity index 92% rename from ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/TransactionRocketConsumer.java rename to ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/TransactionRocketConsumer.java index 75ebc954..1dd758e1 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/consumer/rocketmq/TransactionRocketConsumer.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/TransactionRocketConsumer.java @@ -1,4 +1,4 @@ -package org.dromara.stream.mq.consumer.rocketmq; +package org.dromara.stream.consumer; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/controller/PushMessageController.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/controller/PushMessageController.java index f815b515..788ba343 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/controller/PushMessageController.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/controller/PushMessageController.java @@ -1,12 +1,8 @@ package org.dromara.stream.controller; -import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; -import org.dromara.stream.mq.producer.kafkaMq.KafkaNormalProducer; -import org.dromara.stream.mq.producer.rabbitMq.DelayRabbitProducer; -import org.dromara.stream.mq.producer.rabbitMq.NormalRabbitProducer; -import org.dromara.stream.mq.producer.rocketMq.NormalRocketProducer; -import org.dromara.stream.mq.producer.rocketMq.TransactionRocketProducer; +import org.dromara.stream.producer.*; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @@ -16,57 +12,58 @@ import org.springframework.web.bind.annotation.RestController; */ @Slf4j @RestController -@RequestMapping("push/message") +@RequestMapping public class PushMessageController { - @Resource + @Autowired private NormalRabbitProducer normalRabbitProducer; - - @Resource + @Autowired private DelayRabbitProducer delayRabbitProducer; - - @Resource + @Autowired private NormalRocketProducer normalRocketProducer; - - @Resource + @Autowired private TransactionRocketProducer transactionRocketProducer; - - @Resource + @Autowired private KafkaNormalProducer normalKafkaProducer; /** - * rabbit普通消息的处理 + * rabbitmq 普通消息 */ - @GetMapping("/rabbitMsg/sendNormal") - public void sendMq() { - normalRabbitProducer.sendMq("hello normal RabbitMsg"); + @GetMapping("/rabbit/send") + public void rabbitSend() { + normalRabbitProducer.send("hello normal RabbitMsg"); } /** - * rabbit延迟队列类型:类似生产者 + * rabbitmq 延迟队列消息 */ - @GetMapping("/rabbitMsg/sendDelay") - public void sendMessage() { - delayRabbitProducer.sendDelayMessage("Hello ttl RabbitMsg"); + @GetMapping("/rabbit/sendDelay") + public void rabbitSendDelay(long delay) { + delayRabbitProducer.sendDelayMessage("Hello ttl RabbitMsg", delay); } /** - * rockerMQ实例 + * rocketmq 发送消息 * 需要手动创建相关的Topic和group */ - @GetMapping("/rocketMq/send") - public void sendRockerMq(){ + @GetMapping("/rocket/send") + public void rocketSend(){ normalRocketProducer.sendMessage(); } - @GetMapping("/rocketMq/transactionMsg") - public void sendRockerMqTransactionMsg(){ + + /** + * rocketmq 事务消息 + */ + @GetMapping("/rocket/transaction") + public void rocketTransaction(){ transactionRocketProducer.sendTransactionMessage(); } + /** - * kafkaSpringboot集成 + * kafka 发送消息 */ - @GetMapping("/kafkaMsg/send") - public void sendKafkaMsg(){ + @GetMapping("/kafka/send") + public void kafkaSend(){ normalKafkaProducer.sendKafkaMsg(); } } diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/listener/TranscationRocketListener.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/listener/TranscationRocketListener.java similarity index 97% rename from ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/listener/TranscationRocketListener.java rename to ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/listener/TranscationRocketListener.java index 1b526f8a..d88282d1 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/listener/TranscationRocketListener.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/listener/TranscationRocketListener.java @@ -1,4 +1,4 @@ -package org.dromara.stream.mq.listener; +package org.dromara.stream.listener; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener; @@ -15,6 +15,7 @@ import org.springframework.stereotype.Component; @Component @RocketMQTransactionListener public class TranscationRocketListener implements RocketMQLocalTransactionListener { + @Override public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) { log.info("执行本地事务"); @@ -39,4 +40,5 @@ public class TranscationRocketListener implements RocketMQLocalTransactionListen log.info("【监听器】检查本地交易===>{}", message); return RocketMQLocalTransactionState.COMMIT; } + } diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/DelayRabbitProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/DelayRabbitProducer.java deleted file mode 100644 index 4c1aeacb..00000000 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/DelayRabbitProducer.java +++ /dev/null @@ -1,25 +0,0 @@ -package org.dromara.stream.mq.producer.rabbitMq; - -import jakarta.annotation.Resource; -import lombok.extern.slf4j.Slf4j; -import org.dromara.stream.config.RabbitTtlQueueConfig; -import org.springframework.amqp.rabbit.core.RabbitTemplate; -import org.springframework.stereotype.Component; -import org.springframework.web.bind.annotation.GetMapping; - -/** - * @author xbhog - * @date 2024/05/25 17:15 - **/ -@Slf4j -@Component -public class DelayRabbitProducer { - @Resource - private RabbitTemplate rabbitTemplate; - - @GetMapping("/sendDelay") - public void sendDelayMessage(String message) { - rabbitTemplate.convertAndSend(RabbitTtlQueueConfig.DELAY_EXCHANGE_NAME, RabbitTtlQueueConfig.DELAY_ROUTING_KEY, message); - log.info("【生产者】Delayed message send: " + message); - } -} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/DelayRabbitProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/DelayRabbitProducer.java new file mode 100644 index 00000000..bbd55a31 --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/DelayRabbitProducer.java @@ -0,0 +1,30 @@ +package org.dromara.stream.producer; + +import lombok.extern.slf4j.Slf4j; +import org.dromara.stream.config.RabbitTtlQueueConfig; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Component; + +/** + * @author xbhog + * @date 2024/05/25 17:15 + **/ +@Slf4j +@Component +public class DelayRabbitProducer { + + @Autowired + private RabbitTemplate rabbitTemplate; + + public void sendDelayMessage(String message, long delay) { + rabbitTemplate.convertAndSend( + RabbitTtlQueueConfig.DELAY_EXCHANGE_NAME, + RabbitTtlQueueConfig.DELAY_ROUTING_KEY, message, message1 -> { + message1.getMessageProperties().setDelayLong(delay); + return message1; + }); + log.info("【生产者】Delayed message send: " + message); + } + +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/kafkaMq/KafkaNormalProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/KafkaNormalProducer.java similarity index 71% rename from ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/kafkaMq/KafkaNormalProducer.java rename to ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/KafkaNormalProducer.java index db3580ea..da179753 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/kafkaMq/KafkaNormalProducer.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/KafkaNormalProducer.java @@ -1,7 +1,7 @@ -package org.dromara.stream.mq.producer.kafkaMq; +package org.dromara.stream.producer; -import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; @@ -14,11 +14,12 @@ import java.util.concurrent.CompletableFuture; @Slf4j @Component public class KafkaNormalProducer { - @Resource + + @Autowired private KafkaTemplate kafkaTemplate; - public void sendKafkaMsg(){ - CompletableFuture send = kafkaTemplate.send("test-topic","hello", "kafkaTest"); + public void sendKafkaMsg() { + CompletableFuture send = kafkaTemplate.send("test-topic", "hello", "kafkaTest"); send.join(); } } diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/NormalRabbitProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/NormalRabbitProducer.java similarity index 69% rename from ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/NormalRabbitProducer.java rename to ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/NormalRabbitProducer.java index af528ff4..142e67af 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rabbitMq/NormalRabbitProducer.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/NormalRabbitProducer.java @@ -1,9 +1,9 @@ -package org.dromara.stream.mq.producer.rabbitMq; +package org.dromara.stream.producer; -import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.dromara.stream.config.RabbitConfig; import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** @@ -13,11 +13,10 @@ import org.springframework.stereotype.Component; @Component public class NormalRabbitProducer { - @Resource - RabbitTemplate rabbitTemplate; + @Autowired + private RabbitTemplate rabbitTemplate; - - public void sendMq(String message) { + public void send(String message) { rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message); log.info("【生产者】Message send: " + message); } diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/NormalRocketProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/NormalRocketProducer.java similarity index 81% rename from ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/NormalRocketProducer.java rename to ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/NormalRocketProducer.java index ffd9d106..27583533 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/NormalRocketProducer.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/NormalRocketProducer.java @@ -1,9 +1,9 @@ -package org.dromara.stream.mq.producer.rocketMq; +package org.dromara.stream.producer; -import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @@ -15,10 +15,10 @@ import org.springframework.stereotype.Component; @Component public class NormalRocketProducer { - @Resource + @Autowired private RocketMQTemplate rocketMQTemplate; - public void sendMessage(){ + public void sendMessage() { SendResult sendResult = rocketMQTemplate.syncSend("TestTopic", MessageBuilder.withPayload("hello world test").build()); log.info("发送普通同步消息-msg,syncSendMessage===>{}", sendResult); } diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/TransactionRocketProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/TransactionRocketProducer.java similarity index 85% rename from ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/TransactionRocketProducer.java rename to ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/TransactionRocketProducer.java index de6341e3..b6477bdd 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/mq/producer/rocketMq/TransactionRocketProducer.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/TransactionRocketProducer.java @@ -1,11 +1,11 @@ -package org.dromara.stream.mq.producer.rocketMq; +package org.dromara.stream.producer; -import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.LocalTransactionState; import org.apache.rocketmq.client.producer.SendStatus; import org.apache.rocketmq.client.producer.TransactionSendResult; import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; @@ -21,19 +21,19 @@ import java.util.List; @Component public class TransactionRocketProducer { - @Resource + @Autowired private RocketMQTemplate rocketMQTemplate; - public void sendTransactionMessage(){ + public void sendTransactionMessage() { List tags = Arrays.asList("TAG-1", "TAG-2", "TAG-3"); for (int i = 0; i < 3; i++) { Message message = MessageBuilder.withPayload("===>事务消息-" + i).build(); //destination formats: `topicName:tags` message – message Message arg – ext arg TransactionSendResult res = rocketMQTemplate.sendMessageInTransaction("transaction_topic:" + tags.get(i), message, i + 1); if (res.getLocalTransactionState().equals(LocalTransactionState.COMMIT_MESSAGE) && res.getSendStatus().equals(SendStatus.SEND_OK)) { - log.info("【生产者】事物消息发送成功;成功结果:{}",res); - }else{ - log.info("【生产者】事务发送失败:失败原因:{}",res); + log.info("【生产者】事物消息发送成功;成功结果:{}", res); + } else { + log.info("【生产者】事务发送失败:失败原因:{}", res); } } } diff --git a/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml index f8209bca..5cc4baa2 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml +++ b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml @@ -9,22 +9,31 @@ spring: profiles: # 环境配置 active: @profiles.active@ - #MQ配置 + +--- # rabbitmq 配置 +spring: rabbitmq: - host: 192.168.1.13 + host: localhost port: 5672 - username: mq - password: mq + username: guest + password: guest publisher-returns: true publisher-confirm-type: correlated + +--- # kafka 配置 +spring: kafka: - bootstrap-servers: 192.168.1.13:9092 + bootstrap-servers: localhost:9092 producer: value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + +--- # rocketmq 配置 rocketmq: - name-server: 192.168.1.13:9876 + name-server: localhost:9876 producer: - group: dist-test # 生产者组 + # 生产者组 + group: dist-test + --- # nacos 配置 spring: cloud: