【问题标题】:How to properly loop and build pyspark dataframe如何正确循环和构建 pyspark 数据框
【发布时间】:2019-11-13 18:54:12
【问题描述】:

我的 Pyspark 数据帧的逻辑已经完成,现在必须应用不同的输入并组合结果。

我想知道创建 UDF 并多次调用它是否有任何性能优势,而不是循环遍历值,如下所示。

### res_schema definition ###

twr = []
twr.append((330,900,900,18900,1))
twr.append((480,900,1800,27000,2))
twr.append((660,900,3600,36000,4))
twr.append((1440,0,25200,43200,1))
twr.append((10080,0,86400,259200,1))

results = []

for tup in twr:
    I = tup[0]
    R = tup[1]
    SR = tup[2]
    SW = tup[3]
    NR = tup[4]

    res = spark.sql(
        """
        SELECT *, LAST_VALUE(offdate) OVER (
            PARTITION BY name
            ORDER BY CAST(ondate AS timestamp) 
            RANGE BETWEEN CURRENT ROW AND INTERVAL {0} MINUTES FOLLOWING
        ) AS ld, SUM(works) OVER (
            PARTITION BY name
            ORDER BY CAST(ondate AS timestamp) 
            RANGE BETWEEN CURRENT ROW AND INTERVAL {0} MINUTES FOLLOWING
        ) AS ws
         FROM lc
         """.format(I))

    for r in res:
        results.append(r)

    ### More logic ###

resdf = spark.createDataFrame(results, res_schema)

我的附加逻辑很广泛,但仍然完全是 spark sql,所以我不确定我的运行速度慢是由于查询还是 for 循环。

【问题讨论】:

    标签: python apache-spark dataframe pyspark iteration


    【解决方案1】:

    我认为将 DF 转换为 List 然后再将其转换为 DF 并不理想。当将 DF 转换为 List 时,您将使用 pyspark 的好处,例如处理大量数据和并行性。

    您应该尝试根据 DF 上的 map、filter、reduce 函数对您的逻辑进行建模,而不是创建列表并在列表上应用逻辑。

    【讨论】:

      【解决方案2】:

      最好避免使用 UDF 以获得更好的性能,尤其是在 Pyspark 中,因为它将生成单独的 python 进程。

      接下来,如果您要从“r”行获取列值,可以尝试内置 collect_listcollect_set 函数。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-10-13
        • 1970-01-01
        • 2019-06-18
        • 2018-12-04
        • 2015-03-19
        相关资源
        最近更新 更多