【发布时间】:2019-11-15 04:01:24
【问题描述】:
我是 kafka-spark 流媒体的新手,并尝试使用协议缓冲区序列化器/反序列化器实现 spark 文档中的示例。到目前为止,我遵循了
上的官方教程https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html https://developers.google.com/protocol-buffers/docs/javatutorial
现在我坚持以下问题。这个问题可能和这篇帖子类似How to deserialize records from Kafka using Structured Streaming in Java?
我已经成功实现了在 kafka 主题上写入消息的序列化程序。现在的任务是使用带有自定义反序列化器的 spark 结构化流来使用它。
public class CustomDeserializer implements Deserializer<Person> {
@Override
public Person deserialize(String topic, byte[] data) {
Person person = null;
try {
person = Person.parseFrom(data);
return person;
} catch (Exception e) {
//ToDo
}
return null;
}
Dataset<Row> dataset = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("subscribe", topic)
.option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
.option("value.deserializer", "de.myproject.CustomDeserializer")
.load()
.select("value");
dataset.writeStream()
.format("console")
.start()
.awaitTermination();
但作为输出,我仍然得到二进制文件。
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+
| value|
+--------------------+
|[08 AC BD BB 09 1...|
+--------------------+
-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+
| value|
+--------------------+
|[08 82 EF D8 08 1...|
+--------------------+
关于教程,我只需要将 value.deserializer 的选项设置为人类可读的格式
.option("value.deserializer", "de.myproject.CustomDeserializer")
我错过了什么吗?
【问题讨论】:
标签: java apache-spark apache-kafka protocol-buffers spark-structured-streaming