【问题标题】:Why can my Kafka Consumer not read the message from my Kafka Producer?为什么我的 Kafka Consumer 无法读取来自我的 Kafka Producer 的消息?
【发布时间】:2022-12-15 01:41:58
【问题描述】:

我是 Kafka 的新手,我只是想在我的 node.js 项目中创建一个 Kafka Producer - 使用 kafkajs 包 - 可以向我的 Spring Boot 应用程序发送一个字符串。

这是我的制作人的样子:

router.put('/kafka/test', async (req, res) => {
    try {

        await producer.connect();
        console.log('kafka connected');
        await producer.send({
            topic: 'kafkaStringTest',
            messages: [
                { value: "{ \"test\": \"This is my test string\" }" }
            ]
        });

        ...

    } catch (err) {
    res.status(500).json({ message: err.message });
}

这是我的消费者的样子:

@Service
public class KafkaConsumer {

    @KafkaListener(topics = "kafkaStringTest")
    public void consume(String message) {
        System.out.println("Consumed message: " + message);
    }
}

当我请求 /kafka/test 端点时,消费者抛出以下错误:

Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [org.apache.kafka.common.serialization.StringDeserializer] to [java.lang.String] for GenericMessage [payload=org.apache.kafka.common.serialization.StringDeserializer@44e58368, headers={kafka_offset=9, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@42a9ac04, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, kafka_receivedTopic=kafkaStringTest, kafka_receivedTimestamp=1671019397060, kafka_groupId=***}]

我该如何解决这个错误?

【问题讨论】:

    标签: javascript node.js spring-boot apache-kafka kafkajs


    【解决方案1】:

    正如文档中指定的那样:

    此机制需要在您的其中一个上添加 @EnableKafka 注释 @Configuration 类和一个侦听器容器工厂,它被使用 配置底层的 ConcurrentMessageListenerContainer。经过 默认情况下,需要一个名为 kafkaListenerContainerFactory 的 bean。

    检查documentaion

    在此工厂中,您需要将 StringDeserializer 设置为消费者属性。 (我假设你在实现 Kafka 生产者时使用了 StringSerializer )

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2018-02-09
      • 1970-01-01
      • 2020-02-22
      • 1970-01-01
      • 2022-10-14
      • 2018-01-09
      • 2019-04-04
      相关资源
      最近更新 更多