【问题标题】:How to save kafka data into different location based on a column value in spark structured streaming?如何根据火花结构化流中的列值将kafka数据保存到不同的位置?
【发布时间】:2019-11-07 05:30:52
【问题描述】:

我有一个用例,我正在使用 Spark 结构化流处理来自 Kafka 的数据。我有多个要订阅的主题,根据 topic 名称,应将数据框转储到定义的位置(不同主题的不同位置)。我看到这是否可以在 spark 数据框中使用某种拆分/过滤函数来解决,但找不到。

到目前为止,我只订阅了一个主题,并且正在使用自己的书面方法将数据转储到镶木地板格式的位置。这是我目前使用的代码:

def save_as_parquet(cast_dataframe: DataFrame,output_path: 
      String,checkpointLocation: String): Unit = {
  val query = cast_dataframe.writeStream
              .format("parquet")
              .option("failOnDataLoss",true)
              .option("path",output_path)
              .option("checkpointLocation",checkpointLocation)
              .start()
              .awaitTermination()
 }

当我订阅不同的主题时,这个 cast_dataframe 也会有来自不同主题的值。我希望将数据从主题转储到仅分配给它的位置。这是怎么做到的?

【问题讨论】:

    标签: apache-spark apache-kafka spark-structured-streaming


    【解决方案1】:

    根据文档,来自 Kafka 源的每一行都具有以下架构:

    Column Type
    key binary
    value binary
    topic string
    ... ...

    假设您正在使用源选项阅读多个主题

    val kafkaInputDf = spark.readStream.format("kafka").[...]
      .option("subscribe", "topic1, topic2, topic3")
      .start()
      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)", "topic")
    

    然后您可以在topic 列上应用过滤器以相应地拆分数据:

    val df1 = kafkaInputDf.filter(col("topic") === "topic1")
    val df2 = kafkaInputDf.filter(col("topic") === "topic2")
    val df3 = kafkaInputDf.filter(col("topic") === "topic3")
    

    然后,您可以将这三个流数据帧 df1df2df3 接收到所需的接收器中。由于这将创建三个并行运行的流式查询,因此每个 writeStream 都有自己的检查点位置非常重要。

    【讨论】:

      【解决方案2】:

      正如official documentation中解释的那样,Dataset 可能包含可选的topic 列,可用于消息路由:

      * 如果没有指定“主题”配置选项,则需要主题列。

      值列是唯一必需的选项。如果未指定键列,则将自动添加空值键列(有关如何处理空值键值,请参阅 Kafka 语义)。如果存在主题列,则在将给定行写入 Kafka 时将其值用作主题,除非设置了“主题”配置选项,即“主题”配置选项覆盖主题列。

      【讨论】:

        猜你喜欢
        • 2019-05-26
        • 2018-11-08
        • 2015-07-06
        • 2018-02-11
        • 2020-08-18
        • 2021-05-31
        • 2019-09-20
        • 2021-07-08
        • 1970-01-01
        相关资源
        最近更新 更多