From 3e57a42f39a7a8109329820992fb645620d9c40b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=96=AF=E7=8B=82=E7=9A=84=E7=8B=AE=E5=AD=90Li?= <15040126243@163.com> Date: Tue, 4 Jun 2024 14:10:37 +0800 Subject: [PATCH] =?UTF-8?q?update=20=E4=BC=98=E5=8C=96=20=E5=AE=8C?= =?UTF-8?q?=E5=96=84kafka=E6=A1=88=E4=BE=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/dromara/stream/consumer/KafkaNormalConsumer.java | 6 +++--- .../org/dromara/stream/producer/KafkaNormalProducer.java | 7 ++----- .../ruoyi-test-mq/src/main/resources/application.yml | 6 +++++- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/KafkaNormalConsumer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/KafkaNormalConsumer.java index c964c265..ac59733e 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/KafkaNormalConsumer.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/consumer/KafkaNormalConsumer.java @@ -14,11 +14,11 @@ import org.springframework.stereotype.Component; public class KafkaNormalConsumer { //默认获取最后一条消息 - @KafkaListener(topics = "test-topic",groupId = "demo") - public void timiKafka(ConsumerRecord record){ + @KafkaListener(topics = "test-topic", groupId = "test-group-id") + public void timiKafka(ConsumerRecord record) { Object key = record.key(); Object value = record.value(); - log.info("【消费者】received the message key {},value:{}",key,value); + log.info("【消费者】received the message key {},value:{}", key, value); } } diff --git a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/KafkaNormalProducer.java b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/KafkaNormalProducer.java index da179753..3ada6953 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/KafkaNormalProducer.java +++ b/ruoyi-example/ruoyi-test-mq/src/main/java/org/dromara/stream/producer/KafkaNormalProducer.java @@ -5,8 +5,6 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.stereotype.Component; -import java.util.concurrent.CompletableFuture; - /** * @author xbhog * @date 2024/05/19 18:02 @@ -16,10 +14,9 @@ import java.util.concurrent.CompletableFuture; public class KafkaNormalProducer { @Autowired - private KafkaTemplate kafkaTemplate; + private KafkaTemplate kafkaTemplate; public void sendKafkaMsg() { - CompletableFuture send = kafkaTemplate.send("test-topic", "hello", "kafkaTest"); - send.join(); + kafkaTemplate.send("test-topic", "hello", "kafkaTest"); } } diff --git a/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml index 5cc4baa2..28ef7553 100644 --- a/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml +++ b/ruoyi-example/ruoyi-test-mq/src/main/resources/application.yml @@ -24,8 +24,12 @@ spring: spring: kafka: bootstrap-servers: localhost:9092 + consumer: + group-id: test-group-id # 消费者组ID + auto-offset-reset: earliest # 当没有偏移量或偏移量无效时,从何处开始消费 producer: - value-serializer: org.springframework.kafka.support.serializer.JsonSerializer + key-serializer: org.apache.kafka.common.serialization.StringSerializer + value-serializer: org.apache.kafka.common.serialization.StringSerializer --- # rocketmq 配置 rocketmq: