【问题标题】:Pyspark: Add a new column based on a condition and distinct valuesPyspark:根据条件和不同的值添加新列
【发布时间】:2021-07-03 20:58:40
【问题描述】:

我有一个数据框。

df = spark.createDataFrame(
    [
        ['3', '2', '3', '30', '0040'],
        ['2', '5', '7', '6', '0012'],
        ['5', '8', '1', '73', '0062'],
        ['4', '2', '5', '2', '0005'],
        ['5', '2', '4', '12', '0002'],
        ['8', '3', '2', '23', '0025'],
        ['2', '2', '8', '23', '0004'],
        ['5', '5', '4', '12', '0002'],
        ['8', '2', '2', '23', '0042'],
        ['2', '2', '8', '23', '0004']
    ],
    ['col1', 'col2', 'col3', 'col4', 'col5']
)
df.show()

我想根据以下条件和不同的值添加一个新列。

cond = F.substring(F.col('col5'), 3, 1) == '0'
df1 = df.where(cond)
d_list = df1.select('col2').rdd.map(lambda x: x[0]).distinct().collect()
df2 = df.withColumn('new_col', F.when(F.col('col2').isin(d_list), F.lit('1')).otherwise('0'))
df2.show()

结果:

+----+----+----+----+----+-------+
|col1|col2|col3|col4|col5|new_col|
+----+----+----+----+----+-------+
|   3|   2|   3|  30|0040|      1|
|   2|   5|   7|   6|0012|      1|
|   5|   8|   1|  73|0062|      0|
|   4|   2|   5|   2|0005|      1|
|   5|   2|   4|  12|0002|      1|
|   8|   3|   2|  23|0025|      0|
|   2|   2|   8|  23|0004|      1|
|   5|   5|   4|  12|0002|      1|
|   8|   2|   2|  23|0042|      1|
|   2|   2|   8|  23|0004|      1|
+----+----+----+----+----+-------+

我认为这种方式不适合大型数据集。由于警告,正在寻找没有“collect()”方法的改进或替代方法:use of collect() can lead to poor spark performance

【问题讨论】:

    标签: python dataframe apache-spark pyspark apache-spark-sql


    【解决方案1】:

    您可以使用collect_set添加d_list列,并使用array_contains检查col2是否在该列中:

    from pyspark.sql import functions as F, Window
    
    df2 = df.withColumn(
        'new_col', 
        F.array_contains(
            F.collect_set(
                F.when(
                    F.substring(F.col('col5'), 3, 1) == '0', 
                    F.col('col2')
                )
            ).over(Window.partitionBy(F.lit(1))), 
            F.col('col2')
        ).cast('int')
    )
    
    df2.show()
    +----+----+----+----+----+-------+
    |col1|col2|col3|col4|col5|new_col|
    +----+----+----+----+----+-------+
    |   3|   2|   3|  30|0040|      1|
    |   2|   5|   7|   6|0012|      1|
    |   5|   8|   1|  73|0062|      0|
    |   4|   2|   5|   2|0005|      1|
    |   5|   2|   4|  12|0002|      1|
    |   8|   3|   2|  23|0025|      0|
    |   2|   2|   8|  23|0004|      1|
    |   5|   5|   4|  12|0002|      1|
    |   8|   2|   2|  23|0042|      1|
    |   2|   2|   8|  23|0004|      1|
    +----+----+----+----+----+-------+
    

    【讨论】:

      【解决方案2】:

      这是另一种方式:

      # Aggregate to get the distinct values
      df_distinct = df1.groupby('col2').count()
      
      # Join back to the orignal DF
      df = df.join(df_distinct, on='col2', how='left')
      
      # Create the required column
      df = df.withColumn('new_col', F.when(F.col('count').isNotNull(), F.lit('1')).otherwise(F.lit('0')))
      
      # drop the extraneous count column
      df = df.drop('count')
      

      您没有说明它们在col2 中可能有多少不同的值,但如果数量足够小,您可以使用广播连接来提高性能。

      【讨论】:

        【解决方案3】:

        您也可以尝试在条件为 True 的情况下设置 1,然后在 col2 上进行分区以获得最大值:

        cond = F.substring(F.col('col5'), 3, 1) == '0' 
        out = (df.withColumn("new_col",F.when(cond,1).otherwise(0))
              .withColumn("new_col",F.max("new_col").over(Window.partitionBy("col2"))))
        

        out.show()
        
        +----+----+----+----+----+-------+
        |col1|col2|col3|col4|col5|new_col|
        +----+----+----+----+----+-------+
        |   3|   2|   3|  30|0040|      1|
        |   4|   2|   5|   2|0005|      1|
        |   5|   2|   4|  12|0002|      1|
        |   2|   2|   8|  23|0004|      1|
        |   8|   2|   2|  23|0042|      1|
        |   2|   2|   8|  23|0004|      1|
        |   8|   3|   2|  23|0025|      0|
        |   2|   5|   7|   6|0012|      1|
        |   5|   5|   4|  12|0002|      1|
        |   5|   8|   1|  73|0062|      0|
        +----+----+----+----+----+-------+
        

        如果顺序很重要,请先分配一个 id,然后再分配 orderBy:

        cond = F.substring(F.col('col5'), 3, 1) == '0' 
        
        out = (df.withColumn("Idx",F.monotonically_increasing_id())
               .withColumn("new_col",F.when(cond,1).otherwise(0))
               .withColumn("new_col",F.max("new_col").over(Window.partitionBy("col2")))
               .orderBy("idx").drop("idx"))
        
        out.show()
        
        +----+----+----+----+----+-------+
        |col1|col2|col3|col4|col5|new_col|
        +----+----+----+----+----+-------+
        |   3|   2|   3|  30|0040|      1|
        |   2|   5|   7|   6|0012|      1|
        |   5|   8|   1|  73|0062|      0|
        |   4|   2|   5|   2|0005|      1|
        |   5|   2|   4|  12|0002|      1|
        |   8|   3|   2|  23|0025|      0|
        |   2|   2|   8|  23|0004|      1|
        |   5|   5|   4|  12|0002|      1|
        |   8|   2|   2|  23|0042|      1|
        |   2|   2|   8|  23|0004|      1|
        +----+----+----+----+----+-------+
        

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2020-03-12
          • 2022-07-20
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2019-03-21
          • 2019-07-17
          相关资源
          最近更新 更多