【发布时间】:2017-06-12 00:29:29
【问题描述】:
我是 Flink 和集群计算的新手。我花了一整天的时间试图在 Flink 上正确解析来自 Kafka 的愚蠢流,但没有结果:这有点令人沮丧...... 我在 kafka 中有一个用字符串键标识的 JSON-LD 消息流。我只是想在 Flink 中检索它们,然后用不同的键分隔消息。
1) 最初我考虑将消息作为字符串而不是 JSON-LD 发送。我虽然更容易......
我尝试了所有反序列化器,但没有一个有效。简单的反序列化器显然可以工作,但它完全忽略了键。
我相信我必须使用(Flink 显然只有两个支持密钥的反序列化器):
DataStream<Object> stream = env
.addSource(new FlinkKafkaConsumer010<>("topicTest", new TypeInformationKeyValueSerializationSchema(String.class, String.class, env.getConfig()), properties))
.rebalance();
stream.print();
但我得到:
06/12/2017 02:09:12 来源:自定义来源(4/4)切换到失败 java.io.EOFException 在 org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)
如何在不丢失密钥的情况下接收流消息?
2) 我的 kafka 生产者是用 javascript 实现的,因为 Flink 支持 JSONDeserialization 我想直接在 kafka 中发送 JSON 对象。 我不确定这是否适用于 JSON-LD,但我使用过:
json.parse(jsonld_message)
将消息序列化为 json。然后我用通常的字符串键发送了这个。
但在 Flink 中这段代码不起作用:
DataStream<ObjectNode> stream = env
.addSource(new FlinkKafkaConsumer010<>("topicTest", new JSONKeyValueDeserializationSchema(false), properties))
.rebalance();
stream.print();
养一个
JsonParserException。
我认为第一种方法更简单,我更喜欢它,因为允许一次考虑一个问题(第一个:接收数据,第二个:我猜想用外部库重新转换 JSON-LD 中的字符串)。
【问题讨论】:
标签: java json deserialization apache-flink json-ld