diff --git a/README.md b/README.md index 388c15fa..453bb31c 100644 --- a/README.md +++ b/README.md @@ -36,10 +36,10 @@ | RPC远程调用 | Apache Dubbo | [Apache Dubbo官网](https://dubbo.apache.org/zh/) | 原生态使用体验、高性能 | | 分布式限流熔断 | Alibaba Sentinel | [Alibaba Sentinel文档](https://sentinelguard.io/zh-cn/) | 无侵入、高扩展 | | 分布式事务 | Alibaba Seata | [Alibaba Seata文档](http://seata.io/zh-cn/) | 无侵入、高扩展 支持 四种模式 | -| 分布式消息队列(未完成) | SpringCloud Stream | [SpringCloud Stream文档](https://spring.io/projects/spring-cloud-stream) | 门面框架兼容各种MQ集成 | -| 分布式消息队列(未完成) | Apache Kafka | [Apache Kafka文档](https://kafka.apache.org/) | 高性能高速度 | -| 分布式消息队列(未完成) | Apache RocketMQ | [Apache RocketMQ文档](http://rocketmq.apache.org/) | 高可用功能多样 | -| 分布式消息队列(未完成) | RabbitMQ | [RabbitMQ文档](https://www.rabbitmq.com/) | 支持各种扩展插件功能多样性 | +| 分布式消息队列 | SpringCloud Stream | [SpringCloud Stream文档](https://spring.io/projects/spring-cloud-stream) | 门面框架兼容各种MQ集成 | +| 分布式消息队列 | Apache Kafka | [Apache Kafka文档](https://kafka.apache.org/) | 高性能高速度 | +| 分布式消息队列 | Apache RocketMQ | [Apache RocketMQ文档](http://rocketmq.apache.org/) | 高可用功能多样 | +| 分布式消息队列 | RabbitMQ | [RabbitMQ文档](https://www.rabbitmq.com/) | 支持各种扩展插件功能多样性 | | 分布式搜索引擎(未完成) | ElasticSearch | [ElasticSearch官网](https://www.elastic.co/cn/elasticsearch/) | 业界知名 | | 分布式数据同步(未完成) | Alibaba Canal | [Alibaba Canal官网](https://github.com/alibaba/canal/wiki) | 采集数据同步各种数据库 ES Redis Mysql | | 分布式链路追踪(未完成) | Apache SkyWalking | [Apache SkyWalking文档](https://skywalking.apache.org/docs/) | 链路追踪、网格分析、度量聚合、可视化 | diff --git a/config/dev/ruoyi-gateway.yml b/config/dev/ruoyi-gateway.yml index f6834384..9d056e48 100644 --- a/config/dev/ruoyi-gateway.yml +++ b/config/dev/ruoyi-gateway.yml @@ -78,6 +78,13 @@ spring: - Path=/demo/** filters: - StripPrefix=1 + # MQ演示服务 + - id: ruoyi-stream-mq + uri: lb://ruoyi-stream-mq + predicates: + - Path=/stream-mq/** + filters: + - StripPrefix=1 # sentinel 配置 sentinel: diff --git a/ruoyi-example/pom.xml b/ruoyi-example/pom.xml index 1308afde..4d3049a5 100644 --- a/ruoyi-example/pom.xml +++ b/ruoyi-example/pom.xml @@ -10,6 +10,7 @@ ruoyi-demo + ruoyi-stream-mq ruoyi-example diff --git a/ruoyi-example/ruoyi-stream-mq/pom.xml b/ruoyi-example/ruoyi-stream-mq/pom.xml new file mode 100644 index 00000000..4fca2fc5 --- /dev/null +++ b/ruoyi-example/ruoyi-stream-mq/pom.xml @@ -0,0 +1,99 @@ + + + + com.ruoyi + ruoyi-example + 1.0.0 + + 4.0.0 + + ruoyi-stream-mq + + + ruoyi-stream-mq SpringCloud-Stream-MQ 案例项目 + + + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-discovery + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-nacos-config + + + + + com.alibaba.cloud + spring-cloud-starter-alibaba-sentinel + + + + org.springframework.cloud + spring-cloud-starter-stream-rabbit + + + + com.alibaba.cloud + spring-cloud-starter-stream-rocketmq + + + + org.springframework.cloud + spring-cloud-starter-stream-kafka + + + + + org.springframework.boot + spring-boot-starter-actuator + + + + com.ruoyi + ruoyi-common-security + + + + + com.ruoyi + ruoyi-common-swagger + + + + com.ruoyi + ruoyi-common-web + + + + + + ${project.artifactId} + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot.version} + + + + repackage + + + + + + com.spotify + docker-maven-plugin + + + + + diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/RuoYiStreamMqApplication.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/RuoYiStreamMqApplication.java new file mode 100644 index 00000000..d95b5a28 --- /dev/null +++ b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/RuoYiStreamMqApplication.java @@ -0,0 +1,22 @@ +package com.ruoyi.stream; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.context.metrics.buffering.BufferingApplicationStartup; + +/** + * SpringCloud-Stream-MQ 案例项目 + * + * @author Lion Li + */ +@SpringBootApplication +public class RuoYiStreamMqApplication { + + public static void main(String[] args) { + SpringApplication application = new SpringApplication(RuoYiStreamMqApplication.class); + application.setApplicationStartup(new BufferingApplicationStartup(2048)); + application.run(args); + System.out.println("(♥◠‿◠)ノ゙ MQ案例模块启动成功 ლ(´ڡ`ლ)゙ "); + } + +} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/controller/TestMqController.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/controller/TestMqController.java new file mode 100644 index 00000000..d255720f --- /dev/null +++ b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/controller/TestMqController.java @@ -0,0 +1,52 @@ +package com.ruoyi.stream.controller; + +import com.github.xiaoymin.knife4j.annotations.ApiOperationSupport; +import com.ruoyi.common.core.domain.R; +import com.ruoyi.stream.mq.producer.DelayProducer; +import com.ruoyi.stream.mq.producer.LogStreamProducer; +import com.ruoyi.stream.mq.producer.TestStreamProducer; +import io.swagger.annotations.Api; +import io.swagger.annotations.ApiOperation; +import io.swagger.annotations.ApiParam; +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; + +@Slf4j +@RestController +@AllArgsConstructor +@RequestMapping("/test-mq") +@Api(value = "测试mq", tags = "测试mq") +public class TestMqController { + + private final DelayProducer delayProducer; + private final TestStreamProducer testStreamProducer; + private final LogStreamProducer logStreamProducer; + + @GetMapping("/sendRabbitmq") + @ApiOperationSupport(order = 1) + @ApiOperation(value = "发送消息Rabbitmq", notes = "发送消息") + public R sendRabbitmq(@ApiParam("消息内容") String msg, @ApiParam("延时时间") Long delay) { + delayProducer.sendMsg(msg, delay); + return R.ok(); + } + + @GetMapping("/sendRocketmq") + @ApiOperationSupport(order = 2) + @ApiOperation(value = "发送消息Rocketmq", notes = "发送消息") + public R sendRocketmq(@ApiParam("消息内容") String msg) { + testStreamProducer.streamTestMsg(msg); + return R.ok(); + } + + @GetMapping("/sendKafka") + @ApiOperationSupport(order = 3) + @ApiOperation(value = "发送消息Kafka", notes = "发送消息") + public R sendKafka(@ApiParam("消息内容") String msg) { + logStreamProducer.streamLogMsg(msg); + return R.ok(); + } + +} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/TestMessaging.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/TestMessaging.java new file mode 100644 index 00000000..640b2c72 --- /dev/null +++ b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/TestMessaging.java @@ -0,0 +1,20 @@ +package com.ruoyi.stream.mq; + +import lombok.Data; +import lombok.experimental.Accessors; + +/** + * @author Lion Li + */ +@Data +@Accessors(chain = true) +public class TestMessaging { + /** + * 消息id + */ + private String msgId; + /** + * 消息内容 + */ + private String msgText; +} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/consumer/DelayConsumer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/consumer/DelayConsumer.java new file mode 100644 index 00000000..3ca8463e --- /dev/null +++ b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/consumer/DelayConsumer.java @@ -0,0 +1,22 @@ +package com.ruoyi.stream.mq.consumer; + + +import com.ruoyi.stream.mq.TestMessaging; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.util.function.Consumer; + +@Slf4j +@Component +public class DelayConsumer { + + @Bean + Consumer delay() { + log.info("初始化订阅"); + return obj -> { + log.info("消息接收成功:" + obj); + }; + } +} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/consumer/LogStreamConsumer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/consumer/LogStreamConsumer.java new file mode 100644 index 00000000..a2bea9b1 --- /dev/null +++ b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/consumer/LogStreamConsumer.java @@ -0,0 +1,22 @@ +package com.ruoyi.stream.mq.consumer; + +import com.ruoyi.stream.mq.TestMessaging; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.util.function.Consumer; + +@Slf4j +@Component +public class LogStreamConsumer { + + @Bean + Consumer log() { + log.info("初始化订阅"); + return msg -> { + log.info("通过stream消费到消息 => {}", msg.toString()); + }; + } + +} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/consumer/TestStreamConsumer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/consumer/TestStreamConsumer.java new file mode 100644 index 00000000..9e072c38 --- /dev/null +++ b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/consumer/TestStreamConsumer.java @@ -0,0 +1,22 @@ +package com.ruoyi.stream.mq.consumer; + +import com.ruoyi.stream.mq.TestMessaging; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.annotation.Bean; +import org.springframework.stereotype.Component; + +import java.util.function.Consumer; + +@Slf4j +@Component +public class TestStreamConsumer { + + @Bean + Consumer demo() { + log.info("初始化订阅"); + return msg -> { + log.info("通过stream消费到消息 => {}", msg.toString()); + }; + } + +} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/producer/DelayProducer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/producer/DelayProducer.java new file mode 100644 index 00000000..cf147265 --- /dev/null +++ b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/producer/DelayProducer.java @@ -0,0 +1,27 @@ +package com.ruoyi.stream.mq.producer; + +import com.ruoyi.stream.mq.TestMessaging; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.messaging.Message; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import java.util.UUID; + +@Component +public class DelayProducer { + + @Autowired + private StreamBridge streamBridge; + + public void sendMsg(String msg, Long delay) { + // 构建消息对象 + TestMessaging testMessaging = new TestMessaging() + .setMsgId(UUID.randomUUID().toString()) + .setMsgText(msg); + Message message = MessageBuilder.withPayload(testMessaging) + .setHeader("x-delay", delay).build(); + streamBridge.send("delay-out-0", message); + } +} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/producer/LogStreamProducer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/producer/LogStreamProducer.java new file mode 100644 index 00000000..fb5f5faf --- /dev/null +++ b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/producer/LogStreamProducer.java @@ -0,0 +1,24 @@ +package com.ruoyi.stream.mq.producer; + +import com.ruoyi.stream.mq.TestMessaging; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import java.util.UUID; + +@Component +public class LogStreamProducer { + + @Autowired + private StreamBridge streamBridge; + + public void streamLogMsg(String msg) { + // 构建消息对象 + TestMessaging testMessaging = new TestMessaging() + .setMsgId(UUID.randomUUID().toString()) + .setMsgText(msg); + streamBridge.send("log-out-0", MessageBuilder.withPayload(testMessaging).build()); + } +} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/producer/TestStreamProducer.java b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/producer/TestStreamProducer.java new file mode 100644 index 00000000..15f90034 --- /dev/null +++ b/ruoyi-example/ruoyi-stream-mq/src/main/java/com/ruoyi/stream/mq/producer/TestStreamProducer.java @@ -0,0 +1,24 @@ +package com.ruoyi.stream.mq.producer; + +import com.ruoyi.stream.mq.TestMessaging; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.cloud.stream.function.StreamBridge; +import org.springframework.messaging.support.MessageBuilder; +import org.springframework.stereotype.Component; + +import java.util.UUID; + +@Component +public class TestStreamProducer { + + @Autowired + private StreamBridge streamBridge; + + public void streamTestMsg(String msg) { + // 构建消息对象 + TestMessaging testMessaging = new TestMessaging() + .setMsgId(UUID.randomUUID().toString()) + .setMsgText(msg); + streamBridge.send("demo-out-0", MessageBuilder.withPayload(testMessaging).build()); + } +} diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/resources/application.yml b/ruoyi-example/ruoyi-stream-mq/src/main/resources/application.yml new file mode 100644 index 00000000..35b6d1f6 --- /dev/null +++ b/ruoyi-example/ruoyi-stream-mq/src/main/resources/application.yml @@ -0,0 +1,107 @@ +server: + port: 9402 + +# Spring +spring: + application: + # 应用名称 + name: ruoyi-stream-mq + profiles: + # 环境配置 + active: @profiles.active@ + cloud: + stream: + function: + # 重点配置 与 binding 名与消费者对应 + definition: delay;demo;log + +--- # rabbitmq 配置 +spring: + rabbitmq: + host: localhost + port: 5672 + username: root + password: root + cloud: + stream: + rabbit: + bindings: + delay-in-0: + consumer: + delayedExchange: true + delay-out-0: + producer: + delayedExchange: true + bindings: + delay-in-0: + destination: delay.exchange.cloud + content-type: application/json + group: delay-group + binder: rabbit + delay-out-0: + destination: delay.exchange.cloud + content-type: application/json + group: delay-group + binder: rabbit + +--- # rocketmq 配置 +spring: + cloud: + stream: + rocketmq: + binder: + # rocketmq 地址 + name-server: localhost:9876 + bindings: + demo-out-0: + producer: + # 必须得写 + group: default + bindings: + demo-out-0: + content-type: application/json + destination: stream-test-topic + group: test-group + binder: rocketmq + demo-in-0: + content-type: application/json + destination: stream-test-topic + group: test-group + binder: rocketmq + +--- # kafka 配置 +spring: + cloud: + stream: + kafka: + binder: + brokers: localhost:9092 + bindings: + log-out-0: + destination: stream-log-topic + contentType: application/json + group: log_group + binder: kafka + log-in-0: + destination: stream-log-topic + contentType: application/json + group: log_group + binder: kafka + +--- # nacos 配置 +spring: + cloud: + nacos: + # nacos 服务地址 + server-addr: @nacos.server@ + discovery: + # 注册组 + group: @nacos.discovery.group@ + namespace: ${spring.profiles.active} + config: + # 配置组 + group: @nacos.config.group@ + namespace: ${spring.profiles.active} + config: + import: + - optional:nacos:application-common.yml diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/resources/banner.txt b/ruoyi-example/ruoyi-stream-mq/src/main/resources/banner.txt new file mode 100644 index 00000000..8a8054c1 --- /dev/null +++ b/ruoyi-example/ruoyi-stream-mq/src/main/resources/banner.txt @@ -0,0 +1,10 @@ +Spring Boot Version: ${spring-boot.version} +Spring Application Name: ${spring.application.name} + _ _ + (_) | | + _ __ _ _ ___ _ _ _ ______ ___| |_ _ __ ___ __ _ _ __ ___ ______ _ __ ___ __ _ +| '__| | | |/ _ \| | | | |______/ __| __| '__/ _ \/ _` | '_ ` _ \______| '_ ` _ \ / _` | +| | | |_| | (_) | |_| | | \__ \ |_| | | __/ (_| | | | | | | | | | | | | (_| | +|_| \__,_|\___/ \__, |_| |___/\__|_| \___|\__,_|_| |_| |_| |_| |_| |_|\__, | + __/ | | | + |___/ |_| diff --git a/ruoyi-example/ruoyi-stream-mq/src/main/resources/logback.xml b/ruoyi-example/ruoyi-stream-mq/src/main/resources/logback.xml new file mode 100644 index 00000000..758a7205 --- /dev/null +++ b/ruoyi-example/ruoyi-stream-mq/src/main/resources/logback.xml @@ -0,0 +1,109 @@ + + + + + + + + + + + + ${console.log.pattern} + utf-8 + + + + + + ${log.path}/console.log + + + ${log.path}/console.%d{yyyy-MM-dd}.log + + 1 + + + ${log.pattern} + utf-8 + + + + INFO + + + + + + ${log.path}/info.log + + + + ${log.path}/info.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + INFO + + ACCEPT + + DENY + + + + + ${log.path}/error.log + + + + ${log.path}/error.%d{yyyy-MM-dd}.log + + 60 + + + ${log.pattern} + + + + ERROR + + ACCEPT + + DENY + + + + + + + 0 + + 512 + + + + + + + + 0 + + 512 + + + + + + + + + + + + diff --git a/ruoyi-visual/ruoyi-doc/src/main/resources/application.yml b/ruoyi-visual/ruoyi-doc/src/main/resources/application.yml index 46e2168d..68514730 100644 --- a/ruoyi-visual/ruoyi-doc/src/main/resources/application.yml +++ b/ruoyi-visual/ruoyi-doc/src/main/resources/application.yml @@ -56,3 +56,6 @@ knife4j: - name: 演示服务 uri: ${knife4j.cloud.gatewayUri} location: /demo/v3/api-docs + - name: MQ演示服务 + uri: ${knife4j.cloud.gatewayUri} + location: /stream/v3/api-docs