【问题标题】:How to include kafka timestamp value as columns in spark structured streaming?如何在 Spark 结构化流中包含 kafka 时间戳值作为列?
【发布时间】:2019-04-07 04:28:57
【问题描述】:

我正在寻找将 kafka 的时间戳值添加到我的 Spark 结构化流模式的解决方案。我已经从 kafka 中提取了 value 字段并制作了数据框。我的问题是,我还需要获取时间戳字段(来自 kafka)以及其他列。

这是我当前的代码:

val kafkaDatademostr = spark
  .readStream 
  .format("kafka")
  .option("kafka.bootstrap.servers","zzzz.xxx.xxx.xxx.com:9002")
  .option("subscribe","csvstream")
  .load

val interval = kafkaDatademostr.select(col("value").cast("string")).alias("csv")
  .select("csv.*")

val xmlData = interval.selectExpr("split(value,',')[0] as ddd" ,
    "split(value,',')[1] as DFW",
    "split(value,',')[2] as DTG",
    "split(value,',')[3] as CDF",
    "split(value,',')[4] as DFO",
    "split(value,',')[5] as SAD",
    "split(value,',')[6] as DER",
    "split(value,',')[7] as time_for",
    "split(value,',')[8] as fort")

如何从 kafka 获取时间戳并与其他列一起添加为列?

【问题讨论】:

    标签: scala apache-spark apache-kafka spark-structured-streaming spark-streaming-kafka


    【解决方案1】:

    时间戳包含在源架构中。只需添加“选择时间戳”即可获得如下时间戳。

    val interval = kafkaDatademostr.select(col("value").cast("string").alias("csv"), col("timestamp")).select("csv.*", "timestamp")
    

    【讨论】:

      【解决方案2】:

      在 Apache Spark 官方网页您可以找到指南:Structured Streaming + Kafka Integration Guide (Kafka broker version 0.10.0 or higher)

      您可以在此处找到有关从 Kafka 加载的 DataFrame 架构的信息。

      Kafka 来源的每一行都有以下列:

      • key - 消息键
      • 值 - 消息值
      • 主题 - 命名消息主题
      • partition - 消息来自的分区
      • offset - 消息的偏移量
      • 时间戳 - 时间戳
      • timestampType 时间戳类型

      以上所有列均可查询。 在您的示例中,您仅使用value,因此要获取时间戳只需将timestamp 添加到您的选择语句中:

        val allFields = kafkaDatademostr.selectExpr(
          s"CAST(value AS STRING) AS csv",
          s"CAST(key AS STRING) AS key",
          s"topic as topic",
          s"partition as partition",
          s"offset as offset",
          s"timestamp as timestamp",
          s"timestampType as timestampType"
        )
      

      【讨论】:

      • 如果我想从 Spark 向 Kafka 添加时间戳,而不是反过来呢?我在此处提供的链接中没有找到任何此类指南。我应该在 Kafka 或 Spark 中检查任何配置吗?
      【解决方案3】:

      就我的 Kafka 而言,我收到的是 JSON 格式的值。其中包含实际数据以及原始事件时间而不是 kafka 时间戳。下面是架构。

      val mySchema = StructType(Array(
            StructField("time", LongType),
            StructField("close", DoubleType)
          ))
      

      为了使用 Spark Structured Streaming 的 watermarking 功能,我必须将 time 字段转换为时间戳格式。

      val df1 = df.selectExpr("CAST(value AS STRING)").as[(String)]
            .select(from_json($"value", mySchema).as("data"))
            .select(col("data.time").cast("timestamp").alias("time"),col("data.close"))
      

      现在您可以将时间字段用于窗口操作以及水印目的。

      import spark.implicits._
      val windowedData = df1.withWatermark("time","1 minute")
                            .groupBy(
                                window(col("time"), "1 minute", "30 seconds"),
                                $"close"
                            ).count()
      

      我希望这个答案可以澄清。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2020-01-31
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-11-08
        相关资源
        最近更新 更多