update 优化 完善kafka案例

2.X
疯狂的狮子Li 8 months ago
parent 4c27b2609d
commit 3e57a42f39

@ -14,8 +14,8 @@ import org.springframework.stereotype.Component;
public class KafkaNormalConsumer { public class KafkaNormalConsumer {
//默认获取最后一条消息 //默认获取最后一条消息
@KafkaListener(topics = "test-topic",groupId = "demo") @KafkaListener(topics = "test-topic", groupId = "test-group-id")
public void timiKafka(ConsumerRecord record){ public void timiKafka(ConsumerRecord<String, String> record) {
Object key = record.key(); Object key = record.key();
Object value = record.value(); Object value = record.value();
log.info("【消费者】received the message key {}value{}", key, value); log.info("【消费者】received the message key {}value{}", key, value);

@ -5,8 +5,6 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.util.concurrent.CompletableFuture;
/** /**
* @author xbhog * @author xbhog
* @date 2024/05/19 18:02 * @date 2024/05/19 18:02
@ -16,10 +14,9 @@ import java.util.concurrent.CompletableFuture;
public class KafkaNormalProducer { public class KafkaNormalProducer {
@Autowired @Autowired
private KafkaTemplate kafkaTemplate; private KafkaTemplate<String, String> kafkaTemplate;
public void sendKafkaMsg() { public void sendKafkaMsg() {
CompletableFuture send = kafkaTemplate.send("test-topic", "hello", "kafkaTest"); kafkaTemplate.send("test-topic", "hello", "kafkaTest");
send.join();
} }
} }

@ -24,8 +24,12 @@ spring:
spring: spring:
kafka: kafka:
bootstrap-servers: localhost:9092 bootstrap-servers: localhost:9092
consumer:
group-id: test-group-id # 消费者组ID
auto-offset-reset: earliest # 当没有偏移量或偏移量无效时,从何处开始消费
producer: producer:
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
--- # rocketmq 配置 --- # rocketmq 配置
rocketmq: rocketmq:

Loading…
Cancel
Save