【问题标题】:Error when using mapGroupsWithState in Spark Structured Streaming在 Spark 结构化流中使用 mapGroupsWithState 时出错
【发布时间】:2018-03-22 21:16:24
【问题描述】:

当我通过窗口操作应用 mapGroupsWithState 的结果以获取多个字段的聚合计数时出现错误。

输入遵循以下架构,其中可以有许多具有不同时间戳和状态值的相同 id 事件

root
|-- id: string (nullable = true)
|-- location: string (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- state: int (nullable = true)

例如:

event("abc", "a", 1, 1)
event("abc", "a", 2, 2)
event("def", "b", 1, 1)
event("def", "b", 2, 1)
event("ghi", "b", 1, 1)

通过使用 mapGroupsWithState,我只保留每个 id 的最新出现的时间戳,结果架构是相同的,但不会有重复的 id 并且每一行都将包含最新的事件

root
|-- id: string (nullable = true)
|-- location: string (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- state: int (nullable = true)

以上事件的结果:

event("abc", "a", 2, 2)
event("def", "b", 2, 1)
event("ghi", "b", 1, 1)

最后我应用 groupby 窗口操作来聚合一个位置内每个唯一状态的计数,以获得以下架构:

root
 |-- location: string (nullable = true)
 |-- state1: long (nullable = false)
 |-- state2: long (nullable = false)

查询如下所示:

val aggDemand = df
  .select($"id", $"location", $"timestamp", $"state")
  .withWatermark("timestamp", "10 seconds")
  .groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds"), $"location")
  .agg(count(when($"state" === 1L, $"state")) as 'state1, count(when($"state" === 2L, $"state")) as 'state2)
  .filter(unix_timestamp($"window.end".cast(TimestampType)) <= unix_timestamp(from_utc_timestamp(current_timestamp(), "UTC+08:00")) + DataConstant.t1min)
  .filter(unix_timestamp($"window.end".cast(TimestampType)) > unix_timestamp(from_utc_timestamp(current_timestamp(), "UTC+08:00")))
  .drop($"window")

针对来自 kafka 的流式数据帧/数据集运行时,我遇到了以下错误:

Exception in thread "main" org.apache.spark.sql.AnalysisException: mapGroupsWithState is not supported with aggregation on a streaming DataFrame/Dataset;;

目的是获得以下结果:

location | state 1 | state 2
-----------------------------
    a    |    0    |    1
    b    |    2    |    0 

该方法适用于批处理模式,但似乎无法用于流式查询。 查询有什么问题,如何才能达到预期的结果?在执行窗口操作之前,我是否需要存储来自 mapGroupsWithState 的结果?

感谢任何帮助!

【问题讨论】:

  • 你用的是什么版本的 spark?

标签: apache-spark apache-spark-sql spark-structured-streaming


【解决方案1】:

Struct Stream 有很多限制。它不是 Spark Streaming 的替代方案。

在火花流中,您可以在 mapWithState 函数中以相同的结果完成您的问题。

检查此链接

        case m: FlatMapGroupsWithState if m.isStreaming =>

      // Check compatibility with output modes and aggregations in query
      val aggsAfterFlatMapGroups = collectStreamingAggregates(plan)

      if (m.isMapGroupsWithState) {                       // check mapGroupsWithState
        // allowed only in update query output mode and without aggregation
        if (aggsAfterFlatMapGroups.nonEmpty) {
          throwError(
            "mapGroupsWithState is not supported with aggregation " +
              "on a streaming DataFrame/Dataset")
        } else if (outputMode != InternalOutputModes.Update) {
          throwError(
            "mapGroupsWithState is not supported with " +
              s"$outputMode output mode on a streaming DataFrame/Dataset")
        }

UnsupportedOperationChecker

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-05-04
    • 1970-01-01
    • 1970-01-01
    • 2019-04-06
    • 2019-12-21
    • 1970-01-01
    相关资源
    最近更新 更多