【发布时间】:2018-03-22 21:16:24
【问题描述】:
当我通过窗口操作应用 mapGroupsWithState 的结果以获取多个字段的聚合计数时出现错误。
输入遵循以下架构,其中可以有许多具有不同时间戳和状态值的相同 id 事件
root
|-- id: string (nullable = true)
|-- location: string (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- state: int (nullable = true)
例如:
event("abc", "a", 1, 1)
event("abc", "a", 2, 2)
event("def", "b", 1, 1)
event("def", "b", 2, 1)
event("ghi", "b", 1, 1)
通过使用 mapGroupsWithState,我只保留每个 id 的最新出现的时间戳,结果架构是相同的,但不会有重复的 id 并且每一行都将包含最新的事件
root
|-- id: string (nullable = true)
|-- location: string (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- state: int (nullable = true)
以上事件的结果:
event("abc", "a", 2, 2)
event("def", "b", 2, 1)
event("ghi", "b", 1, 1)
最后我应用 groupby 窗口操作来聚合一个位置内每个唯一状态的计数,以获得以下架构:
root
|-- location: string (nullable = true)
|-- state1: long (nullable = false)
|-- state2: long (nullable = false)
查询如下所示:
val aggDemand = df
.select($"id", $"location", $"timestamp", $"state")
.withWatermark("timestamp", "10 seconds")
.groupBy(functions.window($"timestamp", DataConstant.t15min.toString + " seconds", DataConstant.t1min.toString + " seconds"), $"location")
.agg(count(when($"state" === 1L, $"state")) as 'state1, count(when($"state" === 2L, $"state")) as 'state2)
.filter(unix_timestamp($"window.end".cast(TimestampType)) <= unix_timestamp(from_utc_timestamp(current_timestamp(), "UTC+08:00")) + DataConstant.t1min)
.filter(unix_timestamp($"window.end".cast(TimestampType)) > unix_timestamp(from_utc_timestamp(current_timestamp(), "UTC+08:00")))
.drop($"window")
针对来自 kafka 的流式数据帧/数据集运行时,我遇到了以下错误:
Exception in thread "main" org.apache.spark.sql.AnalysisException: mapGroupsWithState is not supported with aggregation on a streaming DataFrame/Dataset;;
目的是获得以下结果:
location | state 1 | state 2
-----------------------------
a | 0 | 1
b | 2 | 0
该方法适用于批处理模式,但似乎无法用于流式查询。 查询有什么问题,如何才能达到预期的结果?在执行窗口操作之前,我是否需要存储来自 mapGroupsWithState 的结果?
感谢任何帮助!
【问题讨论】:
-
你用的是什么版本的 spark?
标签: apache-spark apache-spark-sql spark-structured-streaming