【问题标题】:PySpark function to create custom output based on multiple columns of dataframePySpark 函数基于多列数据框创建自定义输出
【发布时间】:2023-04-01 08:25:01
【问题描述】:

我有一个如下结构的源 pyspark 数据框:

A B C D E F G
145 589 1 1 12 25
145 589 1 2 1ad34
145 589 1 3 257 18 55
145 589 2 1 12 25
145 589 2 2 22 45
145 589 2 3
145 589 3 1 32 55
145 589 3 2

表格概览:

  1. A 和 B 列的组合将具有索引的 C 列。对于每个索引的 C 列,我们将有 D 列。 A|B|C|D 的连接标识一条唯一记录。
  2. 对于下面的完整,数据帧检查是否在数据帧记录遍历的任何点设置了 E 列。如果是,则返回第一个数值(例如 257 应该得到结果,而 1ad34 应该被忽略)这将是优先级 1 操作。
  3. 如果从未设置列 E,则返回最后一行组合的 F 和 G 的串联。如果从未在 E 列上设置 257,则根据 145|589|3|1 返回 3255。

测试用例 1:优先级列 E 包含的值很少。第一个数字是 257。所以对于 145|589,我们的输出应该是 257。

测试用例 2:优先级列 E 完全为空,然后选取 F 和 G 列的最后一个串联值,结果应为 3255 对应 145|589

我已经为此实现了一个 pyspark 代码,如下所示:

def get_resulting_id(grouped_A_B_df):
    try :
        out=''
        first_E_val_df=grouped_A_B_df.filter(col("E").cast("int").isNotNull()).first()
        if ( first_E_val_df):
            return first_E_val_df["E"]
        unique_C = [x.C for x in grouped_A_B_df.select('C').distinct().collect()]
        for uniq in unique_C :
            for row in uniq.rdd.toLocalIterator():
                out=str(row['F'])+str(row['G'])
    except:
        raise Exception("Func failed")
    return out

由于源数据帧有 2000 万条记录,我不想在优先级 2 条件下使用本地迭代器,任何可能的方式来加速操作。由 A 列和 B 列组合划分的源数据帧将给出子集数据帧。我希望将我的自定义函数应用于该子集数据帧并返回每个子集数据帧的结果。

【问题讨论】:

  • 预期输出是什么?当前数据框中的新列?或本地 python 对象,即 pandas df 或 list ?
  • Pandas df 没问题。
  • 如果您可以提供更好的样本输入,包括案例 E 和案例 F+G,那就太好了! (我们真的不需要那么多行,只需要相关行) - How to create a Minimal, Reproducible Example
  • 谢谢史蒂文!它有帮助,我想知道如何通过合并和分区从 E 列中获取第一个数值。假设 145|589|1|2 的值为 123abc,那么我们需要忽略它并获取第一个数值。 first(col("E").cast(DecimalType()).isNull(), ignorenulls=True) 可能不在这里,因为它会使第一个表达式为布尔值,第二个表达式为字符串,对吗?有什么办法吗?
  • 如果您只需要数值,您只需将其转换为 int 并将其转换回字符串:将 F.first("E", ignorenulls=True) 替换为 F.first(F.col("E").cast("int").cast("str"), ignorenulls=True)。但同样,它不在您的示例数据中,因此请使用 Minimal, Reproductible example 更新您的示例。

标签: pyspark lambda user-defined-functions rdd


【解决方案1】:

根据您提供的示例输入数据,不确定您的预期输出到底是什么。我试过你的功能,输出是“257”,所以这是我完整的 pyspark 代码,应该提供相同的输出:

from pyspark.sql import functions as F, Window as W

df.select(
    "A",
    "B",
    F.coalesce(
        F.first("E", ignorenulls=True).over(
            W.partitionBy("A", "B")
            .orderBy("C", "D")
            .rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
        ),
        F.last(F.concat(F.col("F"), F.col("G")), ignorenulls=True).over(
            W.partitionBy("A", "B")
            .orderBy("C", "D")
            .rowsBetween(W.unboundedPreceding, W.unboundedFollowing)
        ),
    ).alias("out"),
).distinct().show()

+---+---+---+                                                                   
|  A|  B|out|
+---+---+---+
|145|589|257|
+---+---+---+

如果您需要 pandas df 作为输出,您可以将 .show() 替换为 .toPandas()

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-11-22
    • 2021-08-20
    • 1970-01-01
    • 2014-07-23
    • 2018-01-04
    • 2020-01-01
    相关资源
    最近更新 更多