update 优化 !pr163 代码结构与重新实现 rabbitmq 延迟队列

2.X
疯狂的狮子Li 9 months ago
parent 0dac5a544f
commit dabf9b4a98

@ -73,10 +73,10 @@ spring:
filters:
- StripPrefix=1
# MQ演示服务
- id: ruoyi-stream-mq
uri: lb://ruoyi-stream-mq
- id: ruoyi-test-mq
uri: lb://ruoyi-test-mq
predicates:
- Path=/stream-mq/**
- Path=/test-mq/**
filters:
- StripPrefix=1

@ -45,7 +45,6 @@
<mapstruct-plus.version>1.3.6</mapstruct-plus.version>
<mapstruct-plus.lombok.version>0.2.0</mapstruct-plus.lombok.version>
<justauth.version>1.16.6</justauth.version>
<rocketmq-version>2.3.0</rocketmq-version>
<!-- 离线IP地址定位库 -->
<ip2region.version>2.7.0</ip2region.version>
<!-- 临时修复 fastjson 漏洞 -->
@ -60,6 +59,8 @@
<sms4j.version>3.2.1</sms4j.version>
<!-- 工作流配置 -->
<flowable.version>7.0.0</flowable.version>
<!-- mq配置 -->
<rocketmq.version>2.3.0</rocketmq.version>
<!-- 插件版本 -->
<maven-compiler-plugin.verison>3.11.0</maven-compiler-plugin.verison>
@ -389,7 +390,7 @@
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>${rocketmq-version}</version>
<version>${rocketmq.version}</version>
</dependency>
</dependencies>

@ -2,12 +2,6 @@
## 模块说明
该模块基于需求【[将spring-cloud-stream改为普通spring的mq依赖用法】下修改编写
原模块缺点:功能复杂学习成本高 大部分用户用不明白 功能封闭 特性无法使用 项目中基本不会有切换mq的事情发生
现模块集成基础的rabbit、rocketmq、kafka等主流的中间件功能包含
1. rabbit: 普通消息、延迟队列
2. rocket普通消息、事务消息
3. kafka普通消息
@ -17,50 +11,6 @@
1. kafka stream流的使用
2. rocket 顺序、异步、延时等
## 项目目录
```xml
├─src
│ └─main
│ ├─java
│ │ └─org
│ │ └─dromara
│ │ └─stream
│ │ │ RuoYiTestMqApplication.java
│ │ │
│ │ ├─config
│ │ │ RabbitConfig.java 普通消息配置类
│ │ │ RabbitTtlQueueConfig.java 延迟队列配置类
│ │ │
│ │ ├─controller 测试类
│ │ │ PushMessageController.java
│ │ │
│ │ └─mq
│ │ ├─consumer
│ │ │ ├─kafkaMq
│ │ │ │ KafkaNormalConsumer.java
│ │ │ ├─rabbit
│ │ │ │ ConsumerListener.java
│ │ │ └─rocketmq
│ │ │ NormalRocketConsumer.java
│ │ │ TransactionRocketConsumer.java
│ │ ├─listener
│ │ │ TranscationRocketListener.java
│ │ └─producer
│ │ ├─kafkaMq
│ │ │ KafkaNormalProducer.java
│ │ ├─rabbitMq
│ │ │ DelayRabbitProducer.java
│ │ │ NormalRabbitProducer.java
│ │ └─rocketMq
│ │ NormalRocketProducer.java
│ │ TransactionRocketProducer.java
│ │
│ └─resources
│ application.yml IP:Host根据实际情况替换
│ logback-plus.xml
```
## 使用方式
rocketmq
@ -96,7 +46,3 @@ kafka-topics.sh --create --topic <topic_name> --bootstrap-server <broker_list> -
```shell
kafka-topics.sh --create --topic my_topic --bootstrap-server localhost:9092 --partitions 3 --replication-factor 1
```
## 验证方式
可通过`PushMessageController`实现`Restful`进行测试;

@ -16,6 +16,7 @@ public class RabbitConfig {
public static final String EXCHANGE_NAME = "demo-exchange";
public static final String QUEUE_NAME = "demo-queue";
public static final String ROUTING_KEY = "demo.routing.key";
/**
*
* ExchangeBuilder
@ -29,6 +30,7 @@ public class RabbitConfig {
public TopicExchange exchange() {
return new TopicExchange(EXCHANGE_NAME);
}
/**
*
* durable
@ -38,6 +40,7 @@ public class RabbitConfig {
public Queue queue() {
return new Queue(QUEUE_NAME, false);
}
/**
*
* bing

@ -1,13 +1,15 @@
package org.dromara.stream.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.Map;
/**
* RabbitTTL
*
* @author xbhog
*/
@Configuration
@ -26,44 +28,58 @@ public class RabbitTtlQueueConfig {
public static final String DEAD_LETTER_QUEUE = "dlx-queue";
// 死信路由键名称
public static final String DEAD_LETTER_ROUTING_KEY = "dlx.routing.key";
// 延迟消息的默认 TTL毫秒
@Value("${rabbitmq.delay.ttl:5000}")
private long messageTTL;
// 声明延迟队列
/**
*
*/
@Bean
public Queue delayQueue() {
return QueueBuilder.durable(DELAY_QUEUE_NAME)
.withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE)
.withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY)
.withArgument("x-message-ttl", messageTTL)
.deadLetterExchange(DEAD_LETTER_EXCHANGE)
.deadLetterRoutingKey(DEAD_LETTER_ROUTING_KEY)
.build();
}
// 声明延迟交换机
/**
*
*/
@Bean
public TopicExchange delayExchange() {
return new TopicExchange(DELAY_EXCHANGE_NAME);
public CustomExchange delayExchange() {
return new CustomExchange(DELAY_EXCHANGE_NAME, "x-delayed-message",
true, false, Map.of("x-delayed-type", "direct"));
}
// 将延迟队列绑定到延迟交换机
/**
*
*/
@Bean
public Binding delayBinding(Queue delayQueue, TopicExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY);
public Binding delayBinding(Queue delayQueue, CustomExchange delayExchange) {
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
}
// 声明死信队列
/**
*
*/
@Bean
public Queue deadLetterQueue() {
return new Queue(DEAD_LETTER_QUEUE);
}
// 声明死信交换机
/**
*
*/
@Bean
public TopicExchange deadLetterExchange() {
return new TopicExchange(DEAD_LETTER_EXCHANGE);
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}
// 将死信队列绑定到死信交换机
/**
*
*/
@Bean
public Binding deadLetterBinding(Queue deadLetterQueue, TopicExchange deadLetterExchange) {
public Binding deadLetterBinding(Queue deadLetterQueue, DirectExchange deadLetterExchange) {
return BindingBuilder.bind(deadLetterQueue).to(deadLetterExchange).with(DEAD_LETTER_ROUTING_KEY);
}
}

@ -1,4 +1,4 @@
package org.dromara.stream.mq.consumer.rabbit;
package org.dromara.stream.consumer;
import lombok.extern.slf4j.Slf4j;
import org.dromara.stream.config.RabbitConfig;

@ -1,4 +1,4 @@
package org.dromara.stream.mq.consumer.kafkaMq;
package org.dromara.stream.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;

@ -1,4 +1,4 @@
package org.dromara.stream.mq.consumer.rocketmq;
package org.dromara.stream.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

@ -1,4 +1,4 @@
package org.dromara.stream.mq.consumer.rocketmq;
package org.dromara.stream.consumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;

@ -1,12 +1,8 @@
package org.dromara.stream.controller;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.dromara.stream.mq.producer.kafkaMq.KafkaNormalProducer;
import org.dromara.stream.mq.producer.rabbitMq.DelayRabbitProducer;
import org.dromara.stream.mq.producer.rabbitMq.NormalRabbitProducer;
import org.dromara.stream.mq.producer.rocketMq.NormalRocketProducer;
import org.dromara.stream.mq.producer.rocketMq.TransactionRocketProducer;
import org.dromara.stream.producer.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@ -16,57 +12,58 @@ import org.springframework.web.bind.annotation.RestController;
*/
@Slf4j
@RestController
@RequestMapping("push/message")
@RequestMapping
public class PushMessageController {
@Resource
@Autowired
private NormalRabbitProducer normalRabbitProducer;
@Resource
@Autowired
private DelayRabbitProducer delayRabbitProducer;
@Resource
@Autowired
private NormalRocketProducer normalRocketProducer;
@Resource
@Autowired
private TransactionRocketProducer transactionRocketProducer;
@Resource
@Autowired
private KafkaNormalProducer normalKafkaProducer;
/**
* rabbit
* rabbitmq
*/
@GetMapping("/rabbitMsg/sendNormal")
public void sendMq() {
normalRabbitProducer.sendMq("hello normal RabbitMsg");
@GetMapping("/rabbit/send")
public void rabbitSend() {
normalRabbitProducer.send("hello normal RabbitMsg");
}
/**
* rabbit
* rabbitmq
*/
@GetMapping("/rabbitMsg/sendDelay")
public void sendMessage() {
delayRabbitProducer.sendDelayMessage("Hello ttl RabbitMsg");
@GetMapping("/rabbit/sendDelay")
public void rabbitSendDelay(long delay) {
delayRabbitProducer.sendDelayMessage("Hello ttl RabbitMsg", delay);
}
/**
* rockerMQ
* rocketmq
* Topicgroup
*/
@GetMapping("/rocketMq/send")
public void sendRockerMq(){
@GetMapping("/rocket/send")
public void rocketSend(){
normalRocketProducer.sendMessage();
}
@GetMapping("/rocketMq/transactionMsg")
public void sendRockerMqTransactionMsg(){
/**
* rocketmq
*/
@GetMapping("/rocket/transaction")
public void rocketTransaction(){
transactionRocketProducer.sendTransactionMessage();
}
/**
* kafkaSpringboot
* kafka
*/
@GetMapping("/kafkaMsg/send")
public void sendKafkaMsg(){
@GetMapping("/kafka/send")
public void kafkaSend(){
normalKafkaProducer.sendKafkaMsg();
}
}

@ -1,4 +1,4 @@
package org.dromara.stream.mq.listener;
package org.dromara.stream.listener;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQTransactionListener;
@ -15,6 +15,7 @@ import org.springframework.stereotype.Component;
@Component
@RocketMQTransactionListener
public class TranscationRocketListener implements RocketMQLocalTransactionListener {
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object o) {
log.info("执行本地事务");
@ -39,4 +40,5 @@ public class TranscationRocketListener implements RocketMQLocalTransactionListen
log.info("【监听器】检查本地交易===>{}", message);
return RocketMQLocalTransactionState.COMMIT;
}
}

@ -1,25 +0,0 @@
package org.dromara.stream.mq.producer.rabbitMq;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.dromara.stream.config.RabbitTtlQueueConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
/**
* @author xbhog
* @date 2024/05/25 17:15
**/
@Slf4j
@Component
public class DelayRabbitProducer {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendDelay")
public void sendDelayMessage(String message) {
rabbitTemplate.convertAndSend(RabbitTtlQueueConfig.DELAY_EXCHANGE_NAME, RabbitTtlQueueConfig.DELAY_ROUTING_KEY, message);
log.info("【生产者】Delayed message send: " + message);
}
}

@ -0,0 +1,30 @@
package org.dromara.stream.producer;
import lombok.extern.slf4j.Slf4j;
import org.dromara.stream.config.RabbitTtlQueueConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
* @author xbhog
* @date 2024/05/25 17:15
**/
@Slf4j
@Component
public class DelayRabbitProducer {
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendDelayMessage(String message, long delay) {
rabbitTemplate.convertAndSend(
RabbitTtlQueueConfig.DELAY_EXCHANGE_NAME,
RabbitTtlQueueConfig.DELAY_ROUTING_KEY, message, message1 -> {
message1.getMessageProperties().setDelayLong(delay);
return message1;
});
log.info("【生产者】Delayed message send: " + message);
}
}

@ -1,7 +1,7 @@
package org.dromara.stream.mq.producer.kafkaMq;
package org.dromara.stream.producer;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;
@ -14,7 +14,8 @@ import java.util.concurrent.CompletableFuture;
@Slf4j
@Component
public class KafkaNormalProducer {
@Resource
@Autowired
private KafkaTemplate kafkaTemplate;
public void sendKafkaMsg() {

@ -1,9 +1,9 @@
package org.dromara.stream.mq.producer.rabbitMq;
package org.dromara.stream.producer;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.dromara.stream.config.RabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
/**
@ -13,11 +13,10 @@ import org.springframework.stereotype.Component;
@Component
public class NormalRabbitProducer {
@Resource
RabbitTemplate rabbitTemplate;
@Autowired
private RabbitTemplate rabbitTemplate;
public void sendMq(String message) {
public void send(String message) {
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.ROUTING_KEY, message);
log.info("【生产者】Message send: " + message);
}

@ -1,9 +1,9 @@
package org.dromara.stream.mq.producer.rocketMq;
package org.dromara.stream.producer;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@ -15,7 +15,7 @@ import org.springframework.stereotype.Component;
@Component
public class NormalRocketProducer {
@Resource
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendMessage() {

@ -1,11 +1,11 @@
package org.dromara.stream.mq.producer.rocketMq;
package org.dromara.stream.producer;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.SendStatus;
import org.apache.rocketmq.client.producer.TransactionSendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
@ -21,7 +21,7 @@ import java.util.List;
@Component
public class TransactionRocketProducer {
@Resource
@Autowired
private RocketMQTemplate rocketMQTemplate;
public void sendTransactionMessage() {

@ -9,22 +9,31 @@ spring:
profiles:
# 环境配置
active: @profiles.active@
#MQ配置
--- # rabbitmq 配置
spring:
rabbitmq:
host: 192.168.1.13
host: localhost
port: 5672
username: mq
password: mq
username: guest
password: guest
publisher-returns: true
publisher-confirm-type: correlated
--- # kafka 配置
spring:
kafka:
bootstrap-servers: 192.168.1.13:9092
bootstrap-servers: localhost:9092
producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
--- # rocketmq 配置
rocketmq:
name-server: 192.168.1.13:9876
name-server: localhost:9876
producer:
group: dist-test # 生产者组
# 生产者组
group: dist-test
--- # nacos 配置
spring:
cloud:

Loading…
Cancel
Save