【问题标题】:Simulate Lag Function - Spark structured streaming模拟滞后函数 - Spark 结构化流
【发布时间】:2018-07-21 09:36:44
【问题描述】:

我正在使用 Spark Structured Streaming 来分析传感器数据,并且需要根据传感器之前的时间戳执行计算。我的传入数据流包含三列:sensor_id、timestamp 和 temp。我需要添加第四列,即传感器之前的时间戳,以便我可以计算每个传感器的数据点之间的时间。

这很容易使用传统的批处理,使用延迟函数并按 sensor_id 分组。在流媒体情况下解决此问题的最佳方法是什么?

例如,如果我的流数据帧如下所示:

+----------+-----------+------+
| SensorId | Timestamp | Temp |
+----------+-----------+------+
|     1800 |        34 |   23 |
|      500 |        36 |   54 |
|     1800 |        45 |   23 |
|      500 |        60 |   54 |
|     1800 |        78 |   23 |
+----------+-----------+------+

我想要这样的东西:

+----------+-----------+------+---------+
| SensorId | Timestamp | Temp | Prev_ts |
+----------+-----------+------+---------+
|     1800 |        34 |   23 |      21 |
|      500 |        36 |   54 |      27 |
|     1800 |        45 |   23 |      34 |
|      500 |        60 |   54 |      36 |
|     1800 |        78 |   23 |      45 |
+----------+-----------+------+---------+

如果我尝试

test = filteredData.withColumn("prev_ts", lag("ts").over(Window.partitionBy("sensor_id").orderBy("ts")))

我收到了AnalysisException: 'Non-time-based windows are not supported on streaming DataFrames/Datasets

我能否将每个传感器的先前时间戳保存在我可以引用的数据结构中,然后使用每个新时间戳进行更新?

【问题讨论】:

  • 我接受了 GroupState 的建议,并且一直在使用 this blog post,但还没有在 databricks 笔记本中得到任何工作。如果你想出什么办法,请告诉我!
  • 嗨,马特...你找到解决这个问题的方法了吗?

标签: scala apache-spark pyspark spark-structured-streaming


【解决方案1】:

没有必要“模拟”任何东西。标准窗口函数可用于结构化流。

s = spark.readStream.
   ...
   load()

s.withColumn("prev_ts", lag("Temp").over(
  Window.partitionBy("SensorId").orderBy("Timestamp")
)

【讨论】:

  • 此代码不适用于结构化流,因为不支持非基于时间的窗口。
猜你喜欢
  • 2017-05-04
  • 2012-07-03
  • 2019-08-24
  • 1970-01-01
  • 2017-03-06
  • 1970-01-01
  • 2017-04-30
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多