【问题标题】:how to create & sort by an ordered categorical variable in pyspark如何在pyspark中按有序分类变量创建和排序
【发布时间】:2020-05-26 05:21:14
【问题描述】:

我正在将一些代码从 pandas 迁移到 pyspark。我的源数据框如下所示:

   a         b  c
0  1    insert  1
1  2    update  1
2  3      seed  1
3  4    insert  2
4  5    update  2
5  6    delete  2
6  7  snapshot  1

我正在应用的操作(在 python / pandas 中)是:

df.b = pd.Categorical(df.b, ordered=True, categories=['insert', 'seed', 'update', 'snapshot', 'delete'])    
df.sort_values(['c', 'b'])

导致输出数据框:

   a         b  c
0  1    insert  1
2  3      seed  1
1  2    update  1
6  7  snapshot  1
3  4    insert  2
4  5    update  2
5  6    delete  2

我不确定如何最好地使用 pyspark 设置有序分类,我最初的方法是使用 case-when 创建一个新列并随后尝试使用它:

df = df.withColumn(
    "_precedence",
    when(col("b") == "insert", 1)
    .when(col("b") == "seed", 2)
    .when(col("b") == "update", 3)
    .when(col("b") == "snapshot", 4)
    .when(col("b") == "delete", 5)
)

【问题讨论】:

    标签: dataframe pyspark categorical-data


    【解决方案1】:

    您可以使用地图:

    from pyspark.sql.functions import create_map, lit, col
    
    categories=['insert', 'seed', 'update', 'snapshot', 'delete']
    
    # per @HaleemurAli, adjusted the below list comprehension to create map
    map1 = create_map([val for (i, c) in enumerate(categories) for val in (c, lit(i))])
    #Column<b'map(insert, 0, seed, 1, update, 2, snapshot, 3, delete, 4)'>
    
    df.orderBy('c', map1[col('b')]).show()
    +---+---+--------+---+
    | id|  a|       b|  c|
    +---+---+--------+---+
    |  0|  1|  insert|  1|
    |  2|  3|    seed|  1|
    |  1|  2|  update|  1|
    |  6|  7|snapshot|  1|
    |  3|  4|  insert|  2|
    |  4|  5|  update|  2|
    |  5|  6|  delete|  2|
    +---+---+--------+---+
    

    要颠倒 b 列的顺序:df.orderBy('c', map1[col('b')].desc()).show()

    【讨论】:

    • 谢谢,虽然我希望能像在 pandas 中一样在本地创建有序分类,但这可能与我们今天将得到的一样接近。 sum 取消嵌套列表很慢,不被认为是好的做法,因此我将其替换为 [val for (i, c) in enumerate(categories) for val in (c, lit(i))]。您能否为未来的读者更新您的答案。
    【解决方案2】:

    您也可以使用 coalesce 和您的 when statements 来执行此操作。

    from pyspark.sql import functions as F
    
    categories=['insert', 'seed', 'update', 'snapshot', 'delete']
    
    cols=[(F.when(F.col("b")==x,F.lit(y))) for x,y in zip(categories,[x for x in (range(1, len(categories)+1))])]
    
    df.orderBy("c",F.coalesce(*cols)).show()
    
    #+---+--------+---+
    #|  a|       b|  c|
    #+---+--------+---+
    #|  1|  insert|  1|
    #|  3|    seed|  1|
    #|  2|  update|  1|
    #|  7|snapshot|  1|
    #|  4|  insert|  2|
    #|  5|  update|  2|
    #|  6|  delete|  2|
    #+---+--------+---+
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-02-15
      • 2019-08-08
      • 1970-01-01
      • 1970-01-01
      • 2016-03-12
      • 1970-01-01
      • 2019-06-04
      • 2023-03-17
      相关资源
      最近更新 更多