【问题标题】:How to pass all windowed values to pyspark UDF如何将所有窗口值传递给 pyspark UDF
【发布时间】:2019-02-15 17:31:43
【问题描述】:

我想对数据框执行以下操作:

  1. 按列分组
  2. 窗口数据
  3. 对窗口数据执行 (udf) 自定义操作

这是我尝试过的示例代码:

from pyspark.sql import SparkSession
from pyspark.sql.types import *
ss = SparkSession.builder
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col

sparkSession = ss.getOrCreate()

sc = sparkSession.sparkContext
sc.setLogLevel("FATAL")

df = sparkSession.createDataFrame([(17.00, "2018-03-10"),
                                   (13.00, "2018-03-11"),
                                   (25.00, "2018-03-12"),
                                   (20.00, "2018-03-13"),
                                   (17.00, "2018-03-14"),
                                   (99.00, "2018-03-15"),
                                   (156.00, "2018-03-22"),
                                   (17.00, "2018-03-31"),
                                   (25.00, "2018-03-15"),
                                   (25.00, "2018-03-16")
                                   ],
                                  ["id", "ts"])

w = F.window(col("ts").cast("timestamp"), "10 days")
windo = w.alias("window")

@udf(ArrayType(FloatType()))
def new_tuple(x):
    #print(type(x))
    return x

df.groupBy("id", windo).agg(new_tuple(F.collect_list("id"))).show(truncate=False)

上面的代码给了我想要的东西。但是,我不确定“collect_list”方法。

我也尝试过 pandas UDF。我使用 pandas 获得了预期的输出(见下文)。但是,“应用”方法不返回窗口列。

问题

  1. collect_list 是在工作节点还是驱动节点上运行?如果 collect_list 将所有结果收集到主节点,则此代码可能无法扩展。

  2. 有没有什么有效的方法可以在没有collect_list的情况下获得以下输出?

  3. 我读过 pandas UDF 是高效的。但是,我不知道如何传递/返回窗口列。

预期输出

+-----+------------------------------------------+---------------------------------+
|id   |window                                    |new_tuple(collect_list(id, 0, 0))|
+-----+------------------------------------------+---------------------------------+
|17.0 |[2018-03-29 19:00:00, 2018-04-08 19:00:00]|[17.0]                           |
|25.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[25.0, 25.0, 25.0]               |
|13.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[13.0]                           |
|99.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[99.0]                           |
|156.0|[2018-03-19 19:00:00, 2018-03-29 19:00:00]|[156.0]                          |
|20.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[20.0]                           |
|17.0 |[2018-03-09 18:00:00, 2018-03-19 19:00:00]|[17.0, 17.0]                     |
+-----+------------------------------------------+---------------------------------+

Question here 没有提供我的问题的答案。我正在对分组数据应用窗口操作。

【问题讨论】:

  • @user10465355 我已经看过您推荐的链接。这不能回答我的问题,也不能提供解决方案。例如,collect_list 将在哪里执行并将数据收集到?我还在我的问题中提到,UDAF 可以为我提供一个没有 collect_list 的列表,但与普通 UDF 相比,它不会返回窗口列

标签: python pandas apache-spark pyspark user-defined-functions


【解决方案1】:

要回答您的第三个问题,您只需要显式创建一个用于存储窗口的列,例如:

df = df.withColumn('window', F.window(col("ts").cast("timestamp"), "10 days"))
df.groupby('id', 'window').apply(pandas_udf)

这里新创建的window 列将是一列字典,其键为startend,分别表示窗口的开始时间和结束时间。您可以通过访问各个元素将其进一步展平为两列开始时间和结束时间:

df = df.withColumn('start', F.col('window')['start'])
df = df.withColumn('end', F.col('window')['end'])

然后,无论 Spark 数据帧在应用于 Pandas UDF 之前的状态是什么,都将是 UDF 接收到的 Pandas 数据帧的状态。因此,您将收到 UDF 端的窗口,并能够在转换后返回它们的值。

【讨论】:

    猜你喜欢
    • 2019-04-02
    • 1970-01-01
    • 2021-11-14
    • 2010-12-16
    • 1970-01-01
    • 2016-05-24
    • 2010-10-06
    • 1970-01-01
    相关资源
    最近更新 更多