【问题标题】:Adding dictionary keys as column name and dictionary value as the constant value of that column in Pyspark df在 Pyspark df 中添加字典键作为列名和字典值作为该列的常量值
【发布时间】:2018-12-04 07:23:24
【问题描述】:

我有一本字典 x = {'colA': 20, 'colB': 30} 和一个 pyspark df。

ID Value
1  ABC
1  BCD
1  AKB
2  CAB
2  AIK
3  KIB 

我想使用 x 创建 df1,如下所示:

ID Value colA colB
1  ABC    20.0  30.0
1  BCD    20.0  30.0
1  AKB    20.0  30.0
2  CAB    20.0  30.0
...

任何想法如何做到这一点 Pyspark。 我知道我可以像这样创建一个常量列,

df1 = df.withColumn('colA', lit(20.0))
df1 = df1.withColumn('colB', lit(30.0))

但不确定从字典中执行此操作的动态过程

【问题讨论】:

    标签: pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    有一些方法可以隐藏循环,但是执行是一样的。例如,您可以使用select

    from pyspark.sql.functions import lit
    
    df2 = df.select("*", *[lit(val).alias(key) for key, val in x.items()])
    df2.show()
    #+---+-----+----+----+
    #| ID|Value|colB|colA|
    #+---+-----+----+----+
    #|  1|  ABC|  30|  20|
    #|  1|  BCD|  30|  20|
    #|  1|  AKB|  30|  20|
    #|  2|  CAB|  30|  20|
    #|  2|  AIK|  30|  20|
    #|  3|  KIB|  30|  20|
    #+---+-----+----+----+
    

    或者functools.reducewithColumn

    from functools import reduce
    df3 = reduce(lambda df, key: df.withColumn(key, lit(x[key])), x, df)
    df3.show()
    # Same as above
    

    或者pyspark.sql.functions.structselect() and the "*" syntax

    from pyspark.sql.functions import struct
    df4 = df.withColumn('x', struct([lit(val).alias(key) for key, val in x.items()]))\
        .select("ID", "Value", "x.*")
    df4.show()
    #Same as above
    

    但是如果你看一下这些方法的执行计划,你会发现它们是完全一样的:

    df2.explain()
    #== Physical Plan ==
    #*Project [ID#44L, Value#45, 30 AS colB#151, 20 AS colA#152]
    #+- Scan ExistingRDD[ID#44L,Value#45]
    
    df3.explain()
    #== Physical Plan ==
    #*Project [ID#44L, Value#45, 30 AS colB#102, 20 AS colA#107]
    #+- Scan ExistingRDD[ID#44L,Value#45]
    
    df4.explain()
    #== Physical Plan ==
    #*Project [ID#44L, Value#45, 30 AS colB#120, 20 AS colA#121]
    #+- Scan ExistingRDD[ID#44L,Value#45]
    

    进一步如果你比较@anil的answer中的循环方法:

    df1 = df  
    for key in x:
        df1 = df1.withColumn(key, lit(x[key]))
    df1.explain()
    #== Physical Plan ==
    #*Project [ID#44L, Value#45, 30 AS colB#127, 20 AS colA#132]
    #+- Scan ExistingRDD[ID#44L,Value#45]
    

    你会发现这也是一样的。

    【讨论】:

    【解决方案2】:

    如下遍历字典

    df1 = df  
    for key in x:
        df1 = df1.withColumn(key, lit(x[key]))
    

    【讨论】:

    • 谢谢。有没有办法将所有键一起应用以一次创建所有列而无需通过循环?
    • 即使你在循环中应用它,spark 也只会执行一次,因为它是一个惰性评估......你不需要担心性能。
    • 好的。知道了。我也得到了这个解决方案,但我认为它效率低下。这就是为什么问。非常感谢您的帮助。
    猜你喜欢
    • 1970-01-01
    • 2021-10-07
    • 2022-01-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多