update 优化 完善kafka-stream案例

2.X
疯狂的狮子Li 10 months ago
parent 3e57a42f39
commit 0fe9c4f17d

@ -4,12 +4,11 @@
1. rabbit: 普通消息、延迟队列
2. rocket普通消息、事务消息
3. kafka普通消息
3. kafka普通消息、stream流的使用
后续可实现的:
1. kafka stream流的使用
2. rocket 顺序、异步、延时等
1. rocket 顺序、异步、延时等
## 使用方式

@ -33,7 +33,10 @@
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>
<dependency>
<groupId>org.dromara</groupId>

@ -0,0 +1,27 @@
package org.dromara.stream.config;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.KStream;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* kafka stream
*
* @author LionLi
*/
@Configuration
public class KafkaStreamsConfig {
@Bean
public KStream<String, String> demoStream(StreamsBuilder builder) {
// 输入主题
KStream<String, String> source = builder.stream("input-topic");
// 转换逻辑:这里只是简单地将消息转换为大写
KStream<String, String> processed = source.mapValues(value -> value.toUpperCase());
// 输出到另一个主题
processed.to("output-topic");
return source;
}
}

@ -30,6 +30,9 @@ spring:
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
streams:
properties:
application.id: kafka-streams-id # 应用ID
--- # rocketmq 配置
rocketmq:

Loading…
Cancel
Save