【问题标题】:PySpark: withColumn() with two conditions and three outcomesPySpark:withColumn() 有两个条件和三个结果
【发布时间】:2017-03-02 21:26:31
【问题描述】:

我正在使用 Spark 和 PySpark。我正在尝试实现与以下伪代码等效的结果:

df = df.withColumn('new_column', 
    IF fruit1 == fruit2 THEN 1, ELSE 0. IF fruit1 IS NULL OR fruit2 IS NULL 3.)

我正在尝试在 PySpark 中执行此操作,但我不确定语法。任何指针?我查看了expr(),但无法正常工作。

请注意,dfpyspark.sql.dataframe.DataFrame

【问题讨论】:

    标签: apache-spark hive pyspark apache-spark-sql hiveql


    【解决方案1】:

    有几种有效的方法可以实现这一点。让我们从所需的导入开始:

    from pyspark.sql.functions import col, expr, when
    

    您可以在 expr 中使用 Hive IF 函数:

    new_column_1 = expr(
        """IF(fruit1 IS NULL OR fruit2 IS NULL, 3, IF(fruit1 = fruit2, 1, 0))"""
    )
    

    when + otherwise:

    new_column_2 = when(
        col("fruit1").isNull() | col("fruit2").isNull(), 3
    ).when(col("fruit1") == col("fruit2"), 1).otherwise(0)
    

    最后你可以使用以下技巧:

    from pyspark.sql.functions import coalesce, lit
    
    new_column_3 = coalesce((col("fruit1") == col("fruit2")).cast("int"), lit(3))
    

    示例数据:

    df = sc.parallelize([
        ("orange", "apple"), ("kiwi", None), (None, "banana"), 
        ("mango", "mango"), (None, None)
    ]).toDF(["fruit1", "fruit2"])
    

    您可以按如下方式使用它:

    (df
        .withColumn("new_column_1", new_column_1)
        .withColumn("new_column_2", new_column_2)
        .withColumn("new_column_3", new_column_3))
    

    结果是:

    +------+------+------------+------------+------------+
    |fruit1|fruit2|new_column_1|new_column_2|new_column_3|
    +------+------+------------+------------+------------+
    |orange| apple|           0|           0|           0|
    |  kiwi|  null|           3|           3|           3|
    |  null|banana|           3|           3|           3|
    | mango| mango|           1|           1|           1|
    |  null|  null|           3|           3|           3|
    +------+------+------------+------------+------------+
    

    【讨论】:

    • 在 spark 2.2+ 中,函数 'col' 对我不起作用。直接使用不带引号的列名有效。例如:new_column_1 = expr(" col_1 + int(col_2/15) ")
    【解决方案2】:

    你会想要使用如下的 udf

    from pyspark.sql.types import IntegerType
    from pyspark.sql.functions import udf
    
    def func(fruit1, fruit2):
        if fruit1 == None or fruit2 == None:
            return 3
        if fruit1 == fruit2:
            return 1
        return 0
    
    func_udf = udf(func, IntegerType())
    df = df.withColumn('new_column',func_udf(df['fruit1'], df['fruit2']))
    

    【讨论】:

    • 我从这个解决方案中遇到了几个错误,@David。第一个是用from pyspark.sql.types import StringType 解决的。第二个是:TypeError: 'int' object is not callable,我不知道如何解决。请注意,dfpyspark.sql.dataframe.DataFrame
    • @user2205916 我有几个错别字。在def func(... 行中,我有fruit 1(带空格)而不是fruit1。在以func_udf =... 开头的行中,我有StringType 而不是IntegerType。使用更新后的代码尝试一下,如果仍有问题,请告诉我
    • 我收到同样的错误信息。另外,我认为df = . . . 末尾缺少一个括号
    • 呃,另一个错字,倒数第二行应该是func_udf = udf(func, IntegerType())
    • 必须运行,但这很接近(可以承受拼写错误)。如果仍然无法正常工作,请确保您没有这种情况 stackoverflow.com/questions/9767391/…
    【解决方案3】:

    pyspark 中的 withColumn 函数可以让你创建一个带有条件的新变量,添加 whenotherwise 函数,你就有了一个正常工作的 if then else 结构。

    对于所有这些,您需要导入 sparksql 函数,因为您会看到如果没有 col() 函数,以下代码将无法工作。

    在第一位,我们声明一个新列-'new column',然后给出包含在when函数中的条件(即fruit1==fruit2),如果条件为真则为1,如果不为真则控制转到然后使用 isNull() 函数处理第二个条件(fruit1 或 fruit2 为 Null)的 else,如果返回 true 3,如果 false,则再次检查 else,给出 0 作为答案。

    from pyspark.sql import functions as F
    
    df=df.withColumn('new_column', 
        F.when(F.col('fruit1')==F.col('fruit2'), 1)
        .otherwise(F.when((F.col('fruit1').isNull()) | (F.col('fruit2').isNull()), 3))
        .otherwise(0))
    

    【讨论】:

    • 你能解释一下你的代码吗?以便新人可以了解您所做的事情
    • @Nidhi,如果fruit1fruit2 来自不同的数据帧,可以执行类似的操作吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-05-25
    • 1970-01-01
    相关资源
    最近更新 更多