【问题标题】:How to Modify a cell/s value based on a condition in Pyspark dataframe如何根据 Pyspark 数据框中的条件修改单元格/s 值
【发布时间】:2018-01-29 11:48:06
【问题描述】:

我有一个数据框,其中包含如下几列:

 类别|类别ID|桶|道具计数|事件计数 | accum_prop_count | accum_event_count
-------------------------------------------------- -------------------------------------------------- -
民族|民族| 1 | 222 |第444章555 |第6677章

此数据框从 0 行开始,我的脚本的每个函数都在其中添加一行。

有一个功能需要根据条件修改 1 或 2 个单元格值。如何做到这一点?

代码:

schema = StructType([StructField("category", StringType()), StructField("category_id", StringType()), StructField("bucket", StringType()), StructField("prop_count", StringType()), StructField("event_count", StringType()), StructField("accum_prop_count",StringType())])
a_df = sqlContext.createDataFrame([],schema)

a_temp = sqlContext.createDataFrame([("nation","nation",1,222,444,555)],schema)
a_df = a_df.unionAll(a_temp)

从其他函数添加的行:

a_temp3 = sqlContext.createDataFrame([("nation","state",2,222,444,555)],schema)
a_df = a_df.unionAll(a_temp3)

现在要修改,我正在尝试加入条件。

a_temp4 = sqlContext.createDataFrame([("state","state",2,444,555,666)],schema)
a_df = a_df.join(a_temp4, [(a_df.category_id == a_temp4.category_id) & (some other cond here)], how = "inner")

但是这段代码不起作用。我收到一个错误:

+--------+------------+------+----------+---------- -+----------------+--------+------------+------+--- -------+-----------+----------------+ |category|category_id|bucket|prop_count|event_count|accum_prop_count|category|category_id|bucket|prop_count|event_count|accum_prop_count| +--------+------------+------+----------+---------- -+----------------+--------+------------+------+--- -------+-----------+----------------+ |民族|状态| 2| 222| 444| 555|状态|状态| 2| 444| 555| 666| +--------+------------+------+----------+---------- -+----------------+--------+------------+------+--- -------+-----------+----------------+

如何解决这个问题?正确的输出应该有 2 行,第二行应该有更新的值

【问题讨论】:

    标签: python apache-spark dataframe sql-update


    【解决方案1】:

    1)。内部联接将从您的初始数据帧中删除行,如果您想拥有与 a_df(左侧)相同数量的行,则需要左联接。

    2)。如果您的列具有相同的名称,则== 条件将重复列,您可以改用列表。

    3)。我想“其他情况”指的是bucket

    4)。如果 a_temp4 存在,您希望保留该值(如果不存在,则连接会将其值设置为 null),psf.coalesce 允许您这样做

    import pyspark.sql.functions as psf
    a_df = a_df.join(a_temp4, ["category_id", "bucket"], how="leftouter").select(
        psf.coalesce(a_temp4.category, a_df.category).alias("category"), 
        "category_id", 
        "bucket", 
        psf.coalesce(a_temp4.prop_count, a_df.prop_count).alias("prop_count"), 
        psf.coalesce(a_temp4.event_count, a_df.event_count).alias("event_count"), 
        psf.coalesce(a_temp4.accum_prop_count, a_df.accum_prop_count).alias("accum_prop_count")
        )
    
    +--------+-----------+------+----------+-----------+----------------+
    |category|category_id|bucket|prop_count|event_count|accum_prop_count|
    +--------+-----------+------+----------+-----------+----------------+
    |   state|      state|     2|       444|        555|             666|
    |  nation|     nation|     1|       222|        444|             555|
    +--------+-----------+------+----------+-----------+----------------+
    

    如果您只使用单行数据框,则应考虑直接对更新进行编码,而不是使用连接:

    def update_col(category_id, bucket, col_name, col_val):
        return psf.when((a_df.category_id == category_id) & (a_df.bucket == bucket), col_val).otherwise(a_df[col_name]).alias(col_name)
    
    a_df.select(
        update_col("state", 2, "category", "nation"), 
        "category_id", 
        "bucket", 
        update_col("state", 2, "prop_count", 444), 
        update_col("state", 2, "event_count", 555), 
        update_col("state", 2, "accum_prop_count", 666)
    ).show()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-09-23
      • 2018-01-16
      • 1970-01-01
      • 2020-04-29
      • 2017-11-13
      • 2021-09-11
      • 2021-05-21
      • 2018-07-03
      相关资源
      最近更新 更多