【问题标题】:Kafka Serializer JSON [duplicate]卡夫卡序列化器 JSON [重复]
【发布时间】:2016-01-01 13:44:33
【问题描述】:

我是 Kafka、序列化和 JSON 的新手

我想要的是生产者通过 kafka 发送 JSON 文件,消费者以原始文件形式消费和使用 JSON 文件。

我能够得到它,因此 JSON 转换为字符串并通过字符串序列化器发送,然后消费者将解析字符串并重新创建 JSON 对象,但我担心这不是有效的或正确的方法(可能会丢失JSON 的字段类型)

所以我考虑制作一个 JSON 序列化程序并将其设置在我的生产者的配置中。

我在这里使用了 JsonEncoder:Kafka: writing custom serializer

但是当我现在尝试运行我的生产者时,似乎在编码器的 toBytes 函数中,try 块永远不会像我想要的那样返回任何东西

try {
            bytes = objectMapper.writeValueAsString(object).getBytes();

        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }

好像objectMapper.writeValueAsString(object).getBytes();获取我的 JSON obj ({"name":"Kate","age":25}) 并将其转换为空,

这是我制作人的运行函数

List<KeyedMessage<String,JSONObject>> msgList=new ArrayList<KeyedMessage<String,JSONObject>>();   

    JSONObject record = new JSONObject();

    record.put("name", "Kate");
    record.put("age", 25);

    msgList.add(new KeyedMessage<String, JSONObject>(topic, record));

    producer.send(msgList);

我错过了什么?我的原始方法(转换为字符串并发送然后重建 JSON obj)可以吗?还是不是正确的方法?

谢谢!

【问题讨论】:

  • 你找到解决方案了吗?
  • 在您删除的答案中,您说意识到我有不兼容的杰克逊罐子,所以问题解决了吗?

标签: json serialization apache-kafka kafka-producer-api


【解决方案1】:

嗯,为什么害怕序列化/反序列化步骤会导致数据丢失?

您可以选择使用 Confluent's Schema Registry 中包含的 Kafka JSON 序列化程序,它是免费的开源软件(免责声明:我在 Confluent 工作)。它的test suite 提供了一些示例来帮助您入门,更多详细信息在serializers and formatters 中进行了描述。这个 JSON 序列化程序和模式注册表本身的好处是它们为 Kafka 提供了与生产者和消费者客户端的透明集成。除了 JSON,如果您需要,还支持 Apache Avro。

恕我直言,在使用 JSON 与 Kafka 交谈时,就开发人员的便利性和易用性而言,这种设置是最佳选择之一——当然是 YMMV!

【讨论】:

    【解决方案2】:

    我建议将您的 JSON 事件字符串转换为字节数组,例如:

    byte[] eventBody = event.getBody();

    这将提高您的性能,并且 Kafka Consumer 还提供 JSON 解析器,可帮助您恢复 JSON。
    如果需要任何进一步的信息,请告诉我。

    【讨论】:

      猜你喜欢
      • 2017-06-19
      • 2018-01-10
      • 2018-06-23
      • 2017-01-10
      • 2019-11-13
      • 2017-01-16
      • 2016-03-06
      • 1970-01-01
      • 2020-08-27
      相关资源
      最近更新 更多