!33 add 新增 ruoyi-stream-mq 演示模块 完成 RabbitMQ RocketMQ Kafka 整合

add 新增 ruoyi-stream-mq 演示模块 完成 RabbitMQ RocketMQ Kafka 整合
2.X
疯狂的狮子Li 3 years ago
parent f045c36494
commit 6f73b542b2

@ -36,10 +36,10 @@
| RPC远程调用 | Apache Dubbo | [Apache Dubbo官网](https://dubbo.apache.org/zh/) | 原生态使用体验、高性能 |
| 分布式限流熔断 | Alibaba Sentinel | [Alibaba Sentinel文档](https://sentinelguard.io/zh-cn/) | 无侵入、高扩展 |
| 分布式事务 | Alibaba Seata | [Alibaba Seata文档](http://seata.io/zh-cn/) | 无侵入、高扩展 支持 四种模式 |
| 分布式消息队列(未完成) | SpringCloud Stream | [SpringCloud Stream文档](https://spring.io/projects/spring-cloud-stream) | 门面框架兼容各种MQ集成 |
| 分布式消息队列(未完成) | Apache Kafka | [Apache Kafka文档](https://kafka.apache.org/) | 高性能高速度 |
| 分布式消息队列(未完成) | Apache RocketMQ | [Apache RocketMQ文档](http://rocketmq.apache.org/) | 高可用功能多样 |
| 分布式消息队列(未完成) | RabbitMQ | [RabbitMQ文档](https://www.rabbitmq.com/) | 支持各种扩展插件功能多样性 |
| 分布式消息队列 | SpringCloud Stream | [SpringCloud Stream文档](https://spring.io/projects/spring-cloud-stream) | 门面框架兼容各种MQ集成 |
| 分布式消息队列 | Apache Kafka | [Apache Kafka文档](https://kafka.apache.org/) | 高性能高速度 |
| 分布式消息队列 | Apache RocketMQ | [Apache RocketMQ文档](http://rocketmq.apache.org/) | 高可用功能多样 |
| 分布式消息队列 | RabbitMQ | [RabbitMQ文档](https://www.rabbitmq.com/) | 支持各种扩展插件功能多样性 |
| 分布式搜索引擎(未完成) | ElasticSearch | [ElasticSearch官网](https://www.elastic.co/cn/elasticsearch/) | 业界知名 |
| 分布式数据同步(未完成) | Alibaba Canal | [Alibaba Canal官网](https://github.com/alibaba/canal/wiki) | 采集数据同步各种数据库 ES Redis Mysql |
| 分布式链路追踪(未完成) | Apache SkyWalking | [Apache SkyWalking文档](https://skywalking.apache.org/docs/) | 链路追踪、网格分析、度量聚合、可视化 |

@ -78,6 +78,13 @@ spring:
- Path=/demo/**
filters:
- StripPrefix=1
# MQ演示服务
- id: ruoyi-stream-mq
uri: lb://ruoyi-stream-mq
predicates:
- Path=/stream-mq/**
filters:
- StripPrefix=1
# sentinel 配置
sentinel:

@ -10,6 +10,7 @@
<modules>
<module>ruoyi-demo</module>
<module>ruoyi-stream-mq</module>
</modules>
<artifactId>ruoyi-example</artifactId>

@ -0,0 +1,99 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/POM/4.0.0"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>com.ruoyi</groupId>
<artifactId>ruoyi-example</artifactId>
<version>1.0.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>ruoyi-stream-mq</artifactId>
<description>
ruoyi-stream-mq SpringCloud-Stream-MQ 案例项目
</description>
<dependencies>
<!-- SpringCloud Alibaba Nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- SpringCloud Alibaba Nacos Config -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
</dependency>
<!-- SpringCloud Alibaba Sentinel -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-sentinel</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<!-- SpringBoot Actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>com.ruoyi</groupId>
<artifactId>ruoyi-common-security</artifactId>
</dependency>
<!-- RuoYi Common Swagger -->
<dependency>
<groupId>com.ruoyi</groupId>
<artifactId>ruoyi-common-swagger</artifactId>
</dependency>
<dependency>
<groupId>com.ruoyi</groupId>
<artifactId>ruoyi-common-web</artifactId>
</dependency>
</dependencies>
<build>
<finalName>${project.artifactId}</finalName>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot.version}</version>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>com.spotify</groupId>
<artifactId>docker-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

@ -0,0 +1,22 @@
package com.ruoyi.stream;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup;
/**
* SpringCloud-Stream-MQ
*
* @author Lion Li
*/
@SpringBootApplication
public class RuoYiStreamMqApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(RuoYiStreamMqApplication.class);
application.setApplicationStartup(new BufferingApplicationStartup(2048));
application.run(args);
System.out.println("(♥◠‿◠)ノ゙ MQ案例模块启动成功 ლ(´ڡ`ლ)゙ ");
}
}

@ -0,0 +1,52 @@
package com.ruoyi.stream.controller;
import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport;
import com.ruoyi.common.core.domain.R;
import com.ruoyi.stream.mq.producer.DelayProducer;
import com.ruoyi.stream.mq.producer.LogStreamProducer;
import com.ruoyi.stream.mq.producer.TestStreamProducer;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import lombok.AllArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@Slf4j
@RestController
@AllArgsConstructor
@RequestMapping("/test-mq")
@Api(value = "测试mq", tags = "测试mq")
public class TestMqController {
private final DelayProducer delayProducer;
private final TestStreamProducer testStreamProducer;
private final LogStreamProducer logStreamProducer;
@GetMapping("/sendRabbitmq")
@ApiOperationSupport(order = 1)
@ApiOperation(value = "发送消息Rabbitmq", notes = "发送消息")
public R<Void> sendRabbitmq(@ApiParam("消息内容") String msg, @ApiParam("延时时间") Long delay) {
delayProducer.sendMsg(msg, delay);
return R.ok();
}
@GetMapping("/sendRocketmq")
@ApiOperationSupport(order = 2)
@ApiOperation(value = "发送消息Rocketmq", notes = "发送消息")
public R<Void> sendRocketmq(@ApiParam("消息内容") String msg) {
testStreamProducer.streamTestMsg(msg);
return R.ok();
}
@GetMapping("/sendKafka")
@ApiOperationSupport(order = 3)
@ApiOperation(value = "发送消息Kafka", notes = "发送消息")
public R<Void> sendKafka(@ApiParam("消息内容") String msg) {
logStreamProducer.streamLogMsg(msg);
return R.ok();
}
}

@ -0,0 +1,20 @@
package com.ruoyi.stream.mq;
import lombok.Data;
import lombok.experimental.Accessors;
/**
* @author Lion Li
*/
@Data
@Accessors(chain = true)
public class TestMessaging {
/**
* id
*/
private String msgId;
/**
*
*/
private String msgText;
}

@ -0,0 +1,22 @@
package com.ruoyi.stream.mq.consumer;
import com.ruoyi.stream.mq.TestMessaging;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Slf4j
@Component
public class DelayConsumer {
@Bean
Consumer<TestMessaging> delay() {
log.info("初始化订阅");
return obj -> {
log.info("消息接收成功:" + obj);
};
}
}

@ -0,0 +1,22 @@
package com.ruoyi.stream.mq.consumer;
import com.ruoyi.stream.mq.TestMessaging;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Slf4j
@Component
public class LogStreamConsumer {
@Bean
Consumer<TestMessaging> log() {
log.info("初始化订阅");
return msg -> {
log.info("通过stream消费到消息 => {}", msg.toString());
};
}
}

@ -0,0 +1,22 @@
package com.ruoyi.stream.mq.consumer;
import com.ruoyi.stream.mq.TestMessaging;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Slf4j
@Component
public class TestStreamConsumer {
@Bean
Consumer<TestMessaging> demo() {
log.info("初始化订阅");
return msg -> {
log.info("通过stream消费到消息 => {}", msg.toString());
};
}
}

@ -0,0 +1,27 @@
package com.ruoyi.stream.mq.producer;
import com.ruoyi.stream.mq.TestMessaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class DelayProducer {
@Autowired
private StreamBridge streamBridge;
public void sendMsg(String msg, Long delay) {
// 构建消息对象
TestMessaging testMessaging = new TestMessaging()
.setMsgId(UUID.randomUUID().toString())
.setMsgText(msg);
Message<TestMessaging> message = MessageBuilder.withPayload(testMessaging)
.setHeader("x-delay", delay).build();
streamBridge.send("delay-out-0", message);
}
}

@ -0,0 +1,24 @@
package com.ruoyi.stream.mq.producer;
import com.ruoyi.stream.mq.TestMessaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class LogStreamProducer {
@Autowired
private StreamBridge streamBridge;
public void streamLogMsg(String msg) {
// 构建消息对象
TestMessaging testMessaging = new TestMessaging()
.setMsgId(UUID.randomUUID().toString())
.setMsgText(msg);
streamBridge.send("log-out-0", MessageBuilder.withPayload(testMessaging).build());
}
}

@ -0,0 +1,24 @@
package com.ruoyi.stream.mq.producer;
import com.ruoyi.stream.mq.TestMessaging;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class TestStreamProducer {
@Autowired
private StreamBridge streamBridge;
public void streamTestMsg(String msg) {
// 构建消息对象
TestMessaging testMessaging = new TestMessaging()
.setMsgId(UUID.randomUUID().toString())
.setMsgText(msg);
streamBridge.send("demo-out-0", MessageBuilder.withPayload(testMessaging).build());
}
}

@ -0,0 +1,107 @@
server:
port: 9402
# Spring
spring:
application:
# 应用名称
name: ruoyi-stream-mq
profiles:
# 环境配置
active: @profiles.active@
cloud:
stream:
function:
# 重点配置 与 binding 名与消费者对应
definition: delay;demo;log
--- # rabbitmq 配置
spring:
rabbitmq:
host: localhost
port: 5672
username: root
password: root
cloud:
stream:
rabbit:
bindings:
delay-in-0:
consumer:
delayedExchange: true
delay-out-0:
producer:
delayedExchange: true
bindings:
delay-in-0:
destination: delay.exchange.cloud
content-type: application/json
group: delay-group
binder: rabbit
delay-out-0:
destination: delay.exchange.cloud
content-type: application/json
group: delay-group
binder: rabbit
--- # rocketmq 配置
spring:
cloud:
stream:
rocketmq:
binder:
# rocketmq 地址
name-server: localhost:9876
bindings:
demo-out-0:
producer:
# 必须得写
group: default
bindings:
demo-out-0:
content-type: application/json
destination: stream-test-topic
group: test-group
binder: rocketmq
demo-in-0:
content-type: application/json
destination: stream-test-topic
group: test-group
binder: rocketmq
--- # kafka 配置
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
log-out-0:
destination: stream-log-topic
contentType: application/json
group: log_group
binder: kafka
log-in-0:
destination: stream-log-topic
contentType: application/json
group: log_group
binder: kafka
--- # nacos 配置
spring:
cloud:
nacos:
# nacos 服务地址
server-addr: @nacos.server@
discovery:
# 注册组
group: @nacos.discovery.group@
namespace: ${spring.profiles.active}
config:
# 配置组
group: @nacos.config.group@
namespace: ${spring.profiles.active}
config:
import:
- optional:nacos:application-common.yml

@ -0,0 +1,10 @@
Spring Boot Version: ${spring-boot.version}
Spring Application Name: ${spring.application.name}
_ _
(_) | |
_ __ _ _ ___ _ _ _ ______ ___| |_ _ __ ___ __ _ _ __ ___ ______ _ __ ___ __ _
| '__| | | |/ _ \| | | | |______/ __| __| '__/ _ \/ _` | '_ ` _ \______| '_ ` _ \ / _` |
| | | |_| | (_) | |_| | | \__ \ |_| | | __/ (_| | | | | | | | | | | | | (_| |
|_| \__,_|\___/ \__, |_| |___/\__|_| \___|\__,_|_| |_| |_| |_| |_| |_|\__, |
__/ | | |
|___/ |_|

@ -0,0 +1,109 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
<!-- 日志存放路径 -->
<property name="log.path" value="logs/${project.artifactId}" />
<!-- 日志输出格式 -->
<property name="console.log.pattern"
value="%red(%d{yyyy-MM-dd HH:mm:ss}) %green([%thread]) %highlight(%-5level) %boldMagenta(%logger{36}%n) - %msg%n"/>
<property name="log.pattern" value="%d{yyyy-MM-dd HH:mm:ss} [%thread] %-5level %logger{36} - %msg%n"/>
<!-- 控制台输出 -->
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>${console.log.pattern}</pattern>
<charset>utf-8</charset>
</encoder>
</appender>
<!-- 控制台输出 -->
<appender name="file_console" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/console.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/console.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大 1天 -->
<maxHistory>1</maxHistory>
</rollingPolicy>
<encoder>
<pattern>${log.pattern}</pattern>
<charset>utf-8</charset>
</encoder>
<filter class="ch.qos.logback.classic.filter.ThresholdFilter">
<!-- 过滤的级别 -->
<level>INFO</level>
</filter>
</appender>
<!-- 系统日志输出 -->
<appender name="file_info" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/info.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/info.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>INFO</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<appender name="file_error" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${log.path}/error.log</file>
<!-- 循环政策:基于时间创建日志文件 -->
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<!-- 日志文件名格式 -->
<fileNamePattern>${log.path}/error.%d{yyyy-MM-dd}.log</fileNamePattern>
<!-- 日志最大的历史 60天 -->
<maxHistory>60</maxHistory>
</rollingPolicy>
<encoder>
<pattern>${log.pattern}</pattern>
</encoder>
<filter class="ch.qos.logback.classic.filter.LevelFilter">
<!-- 过滤的级别 -->
<level>ERROR</level>
<!-- 匹配时的操作:接收(记录) -->
<onMatch>ACCEPT</onMatch>
<!-- 不匹配时的操作:拒绝(不记录) -->
<onMismatch>DENY</onMismatch>
</filter>
</appender>
<!-- info异步输出 -->
<appender name="async_info" class="ch.qos.logback.classic.AsyncAppender">
<!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
<discardingThreshold>0</discardingThreshold>
<!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
<queueSize>512</queueSize>
<!-- 添加附加的appender,最多只能添加一个 -->
<appender-ref ref="file_info"/>
</appender>
<!-- error异步输出 -->
<appender name="async_error" class="ch.qos.logback.classic.AsyncAppender">
<!-- 不丢失日志.默认的,如果队列的80%已满,则会丢弃TRACT、DEBUG、INFO级别的日志 -->
<discardingThreshold>0</discardingThreshold>
<!-- 更改默认的队列的深度,该值会影响性能.默认值为256 -->
<queueSize>512</queueSize>
<!-- 添加附加的appender,最多只能添加一个 -->
<appender-ref ref="file_error"/>
</appender>
<!--系统操作日志-->
<root level="info">
<appender-ref ref="console" />
<appender-ref ref="async_info"/>
<appender-ref ref="async_error"/>
<appender-ref ref="file_console" />
</root>
</configuration>

@ -56,3 +56,6 @@ knife4j:
- name: 演示服务
uri: ${knife4j.cloud.gatewayUri}
location: /demo/v3/api-docs
- name: MQ演示服务
uri: ${knife4j.cloud.gatewayUri}
location: /stream/v3/api-docs

Loading…
Cancel
Save