From 0fe9c4f17dc06187b940732d245e9d60a41b97d8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=96=AF=E7=8B=82=E7=9A=84=E7=8B=AE=E5=AD=90Li?= <15040126243@163.com> Date: Tue, 4 Jun 2024 14:20:20 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E4=BC=98=E5=8C=96=20=E5=AE=8C?= =?UTF-8?q?=E5=96=84kafka-stream=E6=A1=88=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ruoyi-example/ruoyi-test-mq/README.md | 5 ++-- ruoyi-example/ruoyi-test-mq/pom.xml | 5 +++- .../stream/config/KafkaStreamsConfig.java | 27 +++++++++++++++++++ .../src/main/resources/application.yml | 3 +++ 4 files changed, 36 insertions(+), 4 deletions(-) create mode 100644 ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/KafkaStreamsConfig.java diff --git a/ruoyi-example/ruoyi-test-mq/README.md b/ruoyi-example/ruoyi-test-mq/README.md index b44f99ad..d560b0b9 100644 --- a/ruoyi-example/ruoyi-test-mq/README.md +++ b/ruoyi-example/ruoyi-test-mq/README.md @@ -4,12 +4,11 @@ 1. rabbit: 普通消息、延迟队列 2. rocket:普通消息、事务消息 -3. kafka:普通消息 +3. kafka:普通消息、stream流的使用 后续可实现的: -1. kafka stream流的使用 -2. rocket 顺序、异步、延时等 +1. rocket 顺序、异步、延时等 ## 使用方式 diff --git a/ruoyi-example/ruoyi-test-mq/pom.xml b/ruoyi-example/ruoyi-test-mq/pom.xml index 88cdb8f5..a37d75ee 100644 --- a/ruoyi-example/ruoyi-test-mq/pom.xml +++ b/ruoyi-example/ruoyi-test-mq/pom.xml @@ -33,7 +33,10 @@ org.springframework.kafka spring-kafka - + + org.apache.kafka + kafka-streams + org.dromara diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/KafkaStreamsConfig.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/KafkaStreamsConfig.java new file mode 100644 index 00000000..02e441a8 --- /dev/null +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/config/KafkaStreamsConfig.java @@ -0,0 +1,27 @@ +package org.dromara.stream.config; + +import org.apache.kafka.streams.StreamsBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * kafka stream 配置 + * + * @author LionLi + */ +@Configuration +public class KafkaStreamsConfig { + + @Bean + public KStream demoStream(StreamsBuilder builder) { + // 输入主题 + KStream source = builder.stream("input-topic"); + // 转换逻辑:这里只是简单地将消息转换为大写 + KStream processed = source.mapValues(value -> value.toUpperCase()); + // 输出到另一个主题 + processed.to("output-topic"); + return source; + } + +} diff --git a/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml index 28ef7553..1183287e 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml +++ b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml @@ -30,6 +30,9 @@ spring: producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer + streams: + properties: + application.id: kafka-streams-id # 应用ID --- # rocketmq 配置 rocketmq: