【问题标题】:How to convert Python Pandas function to Python PySpark如何将 Python Pandas 函数转换为 Python PySpark
【发布时间】:2020-05-18 18:42:44
【问题描述】:

我目前在将 Python Pandas 函数转换为 Python PySpark 时遇到问题,因为两者都是不同的库。我想做的是有一个查询功能,然后将其应用回同一列。

这是我为 Python Pandas 所做的(年龄是我试图从中检索的数据集中的列):

Age = [1, 3, -100, -99999,  39, 60, 87, 20,  21,  77777]

def clean_age(Age):
    if Age>=0 and Age<=95:
        return Age
    else:
        return np.nan

df['Age'] = df['Age'].apply(clean_age)

它适用于 Python Pandas,但现在这是我为 Python PySpark 所做的,但它不起作用:

from pyspark.sql.types import IntegerType, IntegerType
from pyspark.sql.functions import udf

def clean_age(Age):
    if Age>=0 and Age<=95:
        return Age
    else:
        return NaN

spark.udf.register("clean_age", clean_age)
udf_myFunction = udf(clean_age, IntegerType())
new_df2 = new_df.withColumn('Age_Clean',udf_myFunction('Age'))
new_df2.show()

请告知我如何实现从 Pandas 到 PySpark 的功能。提前致谢!

【问题讨论】:

    标签: python pandas apache-spark pyspark


    【解决方案1】:

    您可能应该考虑使用pandas_udf。这是为Spark &gt;= 2.3.0 准备的(尽管对于您的复杂性而言,这可能有点过头了):

    import pandas as pd
    import pyspark.sql.functions as f
    from pyspark.sql.types import LongType
    
    
    # your function, a and b are assumed to be type pd.Series
    def my_func(a, b):
        return a * b
    
    
    pandas_func = f.pandas_udf(my_func, returnType=LongType())
    
    # create test dataframe
    x = pd.Series([1, 2, 3])
    df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
    
    # apply pandas_func
    df.select(pandas_func(f.col("x"), f.col("x"))).show()
    +-----------------+
    |pandas_func(x, x)|
    +-----------------+
    |                1|
    |                4|
    |                9|
    +-----------------+
    

    如果您希望避免使用pandas_udf,任何版本的Spark &gt;= 2.0.0 都可以利用pyspark.sql.functions.whenotherwise

    import pyspark.sql.functions as f
    
    x = pd.Series([10, 777, -3, 22])
    df = spark.createDataFrame(pd.DataFrame(x, columns=["Age"]))
    
    df.withColumn(
      "Age",
      f.when(
        (f.col("Age") >= 0) & 
        (f.col("Age") <= 95), f.col("Age")).otherwise(f.lit(None))
    )
    

    随意将df.withColumn 包装在一个函数中,然后使用参数df 调用该函数并返回df.withColumn。希望这可以帮助。

    【讨论】:

    • pandas_udf 在 Jupyter Notebook 中有效吗?我试图导入该函数,但它说无法导入名称'pandas_udf'
    • @devaaron 你的 Spark 版本是多少?您可以使用 sc.versionspark._sc.version 或从您的 SparkSessionSparkContext 返回。
    • 我通过 AWS 进行 SSH,但找不到用于 Jupyter Notebook 的 Spark 版本。它在哪个版本上重要吗?我认为它是最新版本。
    • 这确实取决于很多 - pandas_udf 是最近发布的,所以一些旧集群可能没有它。在 jupyter notebook 中执行以下操作:from pyspark.sql import SparkSession,然后是 spark = SparkSession.builder.getOrCreate(),然后得到 spark 的输出。
    • 这个spark = SparkSession.builder.getOrCreate() 会包含一些东西吗?你能告诉我完整的例子吗?我仍然无法使 pandas_udf 工作。我猜 Jupyter Notebook 可能已经很旧了。不使用 Pandas 有没有其他方法可以实现?
    【解决方案2】:

    创建 udf:

    from pyspark.sql.types import IntegerType
    age_check_udf = udf(lambda age: age if (age >= 0 and age <= 95) else np.nan, IntegerType())
    

    从数据框调用:

    new_df2 = new_df.withColumn('Age_Clean', age_check_udf(new_df.Age))
    new_df2.show()
    

    【讨论】:

    • new_df2 将是新的数据框,对吗?例如,我尝试用 new_df2.show() 调用它,但它不起作用。这是为什么?它适用于 .count() 但在我尝试显示结果时无效。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-15
    • 2023-01-18
    • 2019-11-28
    • 2018-01-23
    • 2022-01-23
    相关资源
    最近更新 更多