diff --git a/ruoyi-example/ruoyi-test-mq/README.md b/ruoyi-example/ruoyi-test-mq/README.md index b44f99ad..d560b0b9 100644 --- a/ruoyi-example/ruoyi-test-mq/README.md +++ b/ruoyi-example/ruoyi-test-mq/README.md @@ -4,12 +4,11 @@ 1. rabbit: 普通消息、延迟队列 2. rocket:普通消息、事务消息 -3. kafka:普通消息 +3. kafka:普通消息、stream流的使用 后续可实现的: -1. kafka stream流的使用 -2. rocket 顺序、异步、延时等 +1. rocket 顺序、异步、延时等 ## 使用方式 diff --git a/ruoyi-example/ruoyi-test-mq/pom.xml b/ruoyi-example/ruoyi-test-mq/pom.xml index 88cdb8f5..a37d75ee 100644 --- a/ruoyi-example/ruoyi-test-mq/pom.xml +++ b/ruoyi-example/ruoyi-test-mq/pom.xml @@ -33,7 +33,10 @@ org.springframework.kafka spring-kafka - + + org.apache.kafka + kafka-streams + org.dromara diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/KafkaStreamsConfig.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/KafkaStreamsConfig.java new file mode 100644 index 00000000..02e441a8 --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/KafkaStreamsConfig.java @@ -0,0 +1,27 @@ +package org.dromara.stream.config; + +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * kafka stream 配置 + * + * @author LionLi + */ +@Configuration +public class KafkaStreamsConfig { + + @Bean + public KStream demoStream(StreamsBuilder builder) { + // 输入主题 + KStream source = builder.stream("input-topic"); + // 转换逻辑:这里只是简单地将消息转换为大写 + KStream processed = source.mapValues(value -> value.toUpperCase()); + // 输出到另一个主题 + processed.to("output-topic"); + return source; + } + +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml index 28ef7553..1183287e 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml +++ b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml @@ -30,6 +30,9 @@ spring: producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer + streams: + properties: + application.id: kafka-streams-id # 应用ID --- # rocketmq 配置 rocketmq: