【问题标题】:How to consume correctly from Kafka topic with Java Spark structured streaming如何使用 Java Spark 结构化流从 Kafka 主题中正确消费
【发布时间】:2019-11-15 04:01:24
【问题描述】:

我是 kafka-spark 流媒体的新手,并尝试使用协议缓冲区序列化器/反序列化器实现 spark 文档中的示例。到目前为止,我遵循了

上的官方教程

https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html https://developers.google.com/protocol-buffers/docs/javatutorial

现在我坚持以下问题。这个问题可能和这篇帖子类似How to deserialize records from Kafka using Structured Streaming in Java?

我已经成功实现了在 kafka 主题上写入消息的序列化程序。现在的任务是使用带有自定义反序列化器的 spark 结构化流来使用它。

public class CustomDeserializer implements Deserializer<Person> {

@Override
public Person deserialize(String topic, byte[] data) {
    Person person = null;
    try {
        person = Person.parseFrom(data);

        return person;
    } catch (Exception e) {
               //ToDo
    }

    return null;
 }


Dataset<Row> dataset = sparkSession.readStream()
        .format("kafka")
        .option("kafka.bootstrap.servers", "localhost:9092")
        .option("subscribe", topic)
        .option("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        .option("value.deserializer", "de.myproject.CustomDeserializer")
        .load()
        .select("value");

    dataset.writeStream()
        .format("console")
        .start()
        .awaitTermination();

但作为输出,我仍然得到二进制文件。

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|[08 AC BD BB 09 1...|
+--------------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+
|               value|
+--------------------+
|[08 82 EF D8 08 1...|
+--------------------+

关于教程,我只需要将 value.deserializer 的选项设置为人类可读的格式

.option("value.deserializer", "de.myproject.CustomDeserializer")

我错过了什么吗?

【问题讨论】:

    标签: java apache-spark apache-kafka protocol-buffers spark-structured-streaming


    【解决方案1】:

    您是否错过了文档的这一部分?

    注意以下Kafka参数不能设置,Kafka source或sink会抛出异常:

    • key.deserializer:键总是使用 ByteArrayDeserializer 反序列化为字节数组。使用 DataFrame 操作显式反序列化键。
    • value.deserializer:值始终使用 ByteArrayDeserializer 反序列化为字节数组。使用 DataFrame 操作显式反序列化这些值。

    你必须注册一个 UDF 来调用你的反序列化器

    类似于Read protobuf kafka message using spark structured streaming

    【讨论】:

      【解决方案2】:

      您需要将 byte 转换为 String 数据类型。 dataset.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

      然后你可以使用函数。from_json(dataset.col("value"), StructType) 来取回实际的DF。

      快乐编码:)

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2021-11-05
        • 2018-01-16
        • 1970-01-01
        • 2019-09-24
        • 1970-01-01
        • 2019-07-21
        • 1970-01-01
        相关资源
        最近更新 更多