【发布时间】:2018-07-21 09:36:44
【问题描述】:
我正在使用 Spark Structured Streaming 来分析传感器数据,并且需要根据传感器之前的时间戳执行计算。我的传入数据流包含三列:sensor_id、timestamp 和 temp。我需要添加第四列,即传感器之前的时间戳,以便我可以计算每个传感器的数据点之间的时间。
这很容易使用传统的批处理,使用延迟函数并按 sensor_id 分组。在流媒体情况下解决此问题的最佳方法是什么?
例如,如果我的流数据帧如下所示:
+----------+-----------+------+
| SensorId | Timestamp | Temp |
+----------+-----------+------+
| 1800 | 34 | 23 |
| 500 | 36 | 54 |
| 1800 | 45 | 23 |
| 500 | 60 | 54 |
| 1800 | 78 | 23 |
+----------+-----------+------+
我想要这样的东西:
+----------+-----------+------+---------+
| SensorId | Timestamp | Temp | Prev_ts |
+----------+-----------+------+---------+
| 1800 | 34 | 23 | 21 |
| 500 | 36 | 54 | 27 |
| 1800 | 45 | 23 | 34 |
| 500 | 60 | 54 | 36 |
| 1800 | 78 | 23 | 45 |
+----------+-----------+------+---------+
如果我尝试
test = filteredData.withColumn("prev_ts", lag("ts").over(Window.partitionBy("sensor_id").orderBy("ts")))
我收到了AnalysisException: 'Non-time-based windows are not supported on streaming DataFrames/Datasets
我能否将每个传感器的先前时间戳保存在我可以引用的数据结构中,然后使用每个新时间戳进行更新?
【问题讨论】:
-
我接受了 GroupState 的建议,并且一直在使用 this blog post,但还没有在 databricks 笔记本中得到任何工作。如果你想出什么办法,请告诉我!
-
嗨,马特...你找到解决这个问题的方法了吗?
标签: scala apache-spark pyspark spark-structured-streaming