【发布时间】:2021-11-15 18:42:21
【问题描述】:
我正在像这样流式传输数据:time、id、value
我只想为每个id 保留一条记录,最新的value。处理这个问题的最佳方法是什么?
更喜欢使用 Pyspark
【问题讨论】:
标签: apache-spark pyspark spark-streaming databricks
我正在像这样流式传输数据:time、id、value
我只想为每个id 保留一条记录,最新的value。处理这个问题的最佳方法是什么?
更喜欢使用 Pyspark
【问题讨论】:
标签: apache-spark pyspark spark-streaming databricks
from pyspark.sql import Window
from pyspark.sql.functions import rank, col, monotonically_increasing_id
window = Window.partitionBy("id").orderBy("time",'tiebreak')
df_s
.withColumn('tiebreak', monotonically_increasing_id())
.withColumn('rank', rank().over(window))
.filter(col('rank') == 1).drop('rank','tiebreak')
.show()
添加排名和平局以消除窗口分区之间和窗口分区内的重复或平局。
【讨论】: