【问题标题】:how to fetch string messages from raw buffer objects Kafka java如何从原始缓冲区对象Kafka java中获取字符串消息
【发布时间】:2019-08-01 10:03:34
【问题描述】:

有一个 nodejs Kafka 生产者将文件内容发送到 Kafka。当 Kafka 消费者消费来自 Kafka 的消息时,它看起来像 -

{"type":"Buffer","data":[91,13 .....]

当我在 nodejs Kafka 消费者中使用 m.message.value.toString('utf8') 时,它会打印实际消息。但我需要在 java Kafka 消费者中消费。我尝试使用property.put("value.serializer.encoding", "utf8")new String(consumerRecord.value()),但仍然打印{"type":"Buffer","data":[91,13 .....]。我的问题是如何在 java 中使用由 nodejs Kafka 生产者生成的字符串格式的消息。

【问题讨论】:

    标签: java node.js apache-kafka node-kafka


    【解决方案1】:

    您应该在生产者和消费者 API 调用中使用正确的键和值序列化程序。

    Kafka 提供了一些默认的 key 和 values 序列化器,例如 StringSerializer 等。

    用于产生和使用字符串消息的字符串序列化器。

    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

    例如,您可以参考生产者和消费者文档。

    https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html https://kafka.apache.org/10/javadoc/?org/apache/kafka/clients/consumer/KafkaConsumer.html

    架构注册表 当您从 nodejs 发送消息并使用 kafka 消费者消费该消息时,您需要在 java kafka 消费者中提供与消息反序列化器相同的键和值。

    处理这个不兼容问题的一种选择是使用模式注册表来存储我们使用 kafka avro 的消息的模式。 然后我们可以在 nodejs kafka producer 和 java kafka consumer 中使用该模式来消费消息。

    Nodejs kafka 生产者with avro example

    这是nodejs kafka avro 的示例

    Java Kafka 消费者使用架构注册表example

    【讨论】:

    • props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer") props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer" ")
    • 生产者没有控制权
    • 您使用哪个 API 来生成消息,例如 type ="Buffer"
    • nodejs 卡夫卡。其中有一个属性 // 如果设置为 'buffer',值将作为原始缓冲区对象返回。编码:'utf8'
    猜你喜欢
    • 2016-09-29
    • 1970-01-01
    • 1970-01-01
    • 2019-05-04
    • 1970-01-01
    • 2012-02-25
    • 1970-01-01
    • 1970-01-01
    • 2020-10-06
    相关资源
    最近更新 更多