【问题标题】:Apache Kafka with Structured Streaming protobuf带有结构化流 protobuf 的 Apache Kafka
【发布时间】:2020-01-24 01:40:37
【问题描述】:

我正在尝试使用结构化流编写一个 Kafka 消费者(protobuf)。让我们称 protobuf 为 A,它应该在 Scala 中反序列化为字节数组 (Array[Byte])。

我尝试了所有可以在网上找到的方法,但仍然无法弄清楚如何正确解析消息 A

方法 1:从集成指南 (https://spark.apache.org/docs/2.2.0/structured-streaming-kafka-integration.html) 我应该将值转换为字符串。但即使我确实 getBytes 将字符串转换为字节以解析我的消息 A,我得到:

Exception in thread "main" java.lang.ExceptionInInitializerError
...
Caused by: com.fasterxml.jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.8

方法2:我想将值直接转换为字节数组。我会得到:

missing ')' at '['(line 1, pos 17)

== SQL ==
CAST(key AS Array[Byte])

方法 3:我按照 (https://databricks.com/blog/2017/04/26/processing-data-in-apache-kafka-with-structured-streaming-in-apache-spark-2-2.html) 编写了自己的 protobuf 反序列化器。但收到错误消息:

Schema for type A is not supported

以上三种方法大概是我在网上能找到的所有方法了。这应该是一个简单而常见的问题,所以如果有人对此有见解,请告诉我。

谢谢!

【问题讨论】:

    标签: apache-spark apache-kafka spark-structured-streaming


    【解决方案1】:

    从流式源创建的DataFrame 的架构是:

    root
     |-- key: binary (nullable = true)
     |-- value: binary (nullable = true)
     |-- topic: string (nullable = true)
     |-- partition: integer (nullable = true)
     |-- offset: long (nullable = true)
     |-- timestamp: timestamp (nullable = true)
     |-- timestampType: integer (nullable = true)
    

    所以键和值实际上在Array[Byte]。您必须在 Dataframe 操作中执行反序列化。

    例如,我有这个用于 Kafka 反序列化:

      import sparkSession.implicits._
    
      sparkSession.readStream
        .format("kafka")
        .option("subscribe", topic)
        .option(
          "kafka.bootstrap.servers",
          bootstrapServers
        )
        .load()
        .selectExpr("key", "value") // Selecting only key & value
        .as[(Array[Byte], Array[Byte])]
        .flatMap {
          case (key, value) =>
            for {
              deserializedKey <- Try {
                keyDeserializer.deserialize(topic, key)
              }.toOption
              deserializedValue <- Try {
                valueDeserializer.deserialize(topic, value)
              }.toOption
            } yield (deserializedKey, deserializedValue)
        }
    

    您需要修改它以反序列化您的 protobuf 记录。

    【讨论】:

      猜你喜欢
      • 2020-01-31
      • 1970-01-01
      • 2019-01-29
      • 2020-10-29
      • 2021-08-23
      • 2017-08-23
      • 2018-09-28
      • 2021-12-16
      • 2020-10-18
      相关资源
      最近更新 更多