【发布时间】:2016-01-01 13:44:33
【问题描述】:
我是 Kafka、序列化和 JSON 的新手
我想要的是生产者通过 kafka 发送 JSON 文件,消费者以原始文件形式消费和使用 JSON 文件。
我能够得到它,因此 JSON 转换为字符串并通过字符串序列化器发送,然后消费者将解析字符串并重新创建 JSON 对象,但我担心这不是有效的或正确的方法(可能会丢失JSON 的字段类型)
所以我考虑制作一个 JSON 序列化程序并将其设置在我的生产者的配置中。
我在这里使用了 JsonEncoder:Kafka: writing custom serializer
但是当我现在尝试运行我的生产者时,似乎在编码器的 toBytes 函数中,try 块永远不会像我想要的那样返回任何东西
try {
bytes = objectMapper.writeValueAsString(object).getBytes();
} catch (JsonProcessingException e) {
logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
}
好像objectMapper.writeValueAsString(object).getBytes();获取我的 JSON obj ({"name":"Kate","age":25}) 并将其转换为空,
这是我制作人的运行函数
List<KeyedMessage<String,JSONObject>> msgList=new ArrayList<KeyedMessage<String,JSONObject>>();
JSONObject record = new JSONObject();
record.put("name", "Kate");
record.put("age", 25);
msgList.add(new KeyedMessage<String, JSONObject>(topic, record));
producer.send(msgList);
我错过了什么?我的原始方法(转换为字符串并发送然后重建 JSON obj)可以吗?还是不是正确的方法?
谢谢!
【问题讨论】:
-
你找到解决方案了吗?
-
在您删除的答案中,您说意识到我有不兼容的杰克逊罐子,所以问题解决了吗?
标签: json serialization apache-kafka kafka-producer-api