【问题标题】:Spark structured streaming - how to queue bytes value to Kafka?Spark结构化流 - 如何将字节值排队到Kafka?
【发布时间】:2020-09-13 03:55:30
【问题描述】:

我正在编写一个使用结构化流的 Spark 应用程序。该应用程序从 Kafka 主题 topic1 读取消息,构造一条新消息,将其序列化为 Array[Byte] 并将它们发布到另一个 Kafka 主题 topic2

序列化为字节数组很重要,因为我使用topic2 的下游消费者也使用的特定序列化器/反序列化器。

不过,我在制作 Kafka 时遇到了麻烦。我什至不知道该怎么做..网上只有很多关于排队 JSON 数据的例子。

代码 -

case class OutputMessage(id: String, bytes: Array[Byte])

implicit val encoder: Encoder[OutputMessage] = org.apache.spark.sql.Encoders.kryo

val outputMessagesDataSet: DataSet[OutputMessage] = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "server1")
  .option("subscribe", "topic1")
  .load()
  .select($"value")
  .mapPartitions{r =>
     val messages: Iterator[OutputMessage] = createMessages(r)
     messages
  }

outputMessagesDataSet
  .writeStream
  .selectExpr("CAST(id AS String) AS key", "bytes AS value")
  .format("kafka")
  .option("kafka.bootstrap.servers", "server1")
  .option("topic", "topic2")
  .option("checkpointLocation", loc)
  .trigger(trigger)
  .start
  .awaitTermination

但是,这会引发异常 org.apache.spark.sql.AnalysisException: cannot resolve 'id' given input columns: [value]; line 1 pos 5;

如何以id 作为键、bytes 作为值来排队到 Kafka?

【问题讨论】:

    标签: scala apache-spark apache-kafka


    【解决方案1】:

    您可以检查“收集”消息的数据框的架构。由于您只收集“值”字段,因此传入事件以以下形式到达:

        +-------------------+
        | value             |
        +-------------------+
        | field1,field2,..  |
        +-------------------+
      
    

    您需要像在 Spark 文档中一样查询密钥:

    df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)]
    

    df.select(col("key").cast(StringType), col("value").cast(StringType))
    

    【讨论】:

      【解决方案2】:

      正如@EmiCareOfCell44 建议的那样,我打印出了架构 -

      如果我使用messagesDataSet.printSchema(),那么我只会得到一个binary 类型的值。但是如果我这样做了

      val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "server1")
      .option("subscribe", "topic1")
      .load()
      
      df.printSchema()
      

      然后打印出来

       root
        |-- key: binary (nullable = true)
        |-- value: binary (nullable = true)
        |-- topic: string (nullable = true)
        |-- partition: integer (nullable = true)
        |-- offset: long (nullable = true)
        |-- timestamp: timestamp (nullable = true)
        |-- timestampType: integer (nullable = true)
      

      但是 Dataframe 并没有进行所需的转换,这是在

      中完成的
      .mapPartitions{r =>
       val messages: Iterator[OutputMessage] = createMessages(r)
       messages
      }
      

      Dataset 的值似乎只有一个二进制值。

      我在这里搜索了一些答案,然后找到了这篇文章 - Value Type is binary after Spark Dataset mapGroups operation even return a String in the function

      我设置了一个编码器 -

      implicit val encoder: Encoder[OutputMessage] = org.apache.spark.sql.Encoders.kryo
      

      这导致值被转换为二进制。由于OutputMessage 是一个scala 类,因此不需要编码器,因此我将其删除。之后,打印出架构显示两个字段(字符串和字节,这是我想要的)。之后,.selectExpr("CAST(id AS String) AS key", "bytes AS value") 行运行良好。

      【讨论】:

        猜你喜欢
        • 2021-07-02
        • 2020-01-31
        • 1970-01-01
        • 2022-01-01
        • 2017-01-09
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-09-05
        相关资源
        最近更新 更多