【问题标题】:How to rank the column values based on a dictionary and retain the highest value?如何根据字典对列值进行排名并保留最高值?
【发布时间】:2022-01-27 13:55:44
【问题描述】:

假设我有一个如下的数据框:

| id |col
| 1  | "A,B,C"
| 2  | "D,C"
| 3  | "B,C,A"
| 4  | None

字典是:

d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}

输出数据帧必须是:

| id |col
| 1  | "A"
| 2  | "C"
| 3  | "A"
| 4  | None

【问题讨论】:

    标签: python dataframe apache-spark pyspark apache-spark-sql


    【解决方案1】:

    Higher Order Functions - Transform可用于根据字典对col中的元素进行排名,然后排序得到排名最低的元素。

    from pyspark.sql import functions as F
    from itertools import chain
    
    data = [(1, "A,B,C",),
            (2, "D,C",),
            (3, "B,C,A",),
            (4, None,), ]
    df = spark.createDataFrame(data, ("id", "col", ))
    
    d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
    
    mapper = F.create_map([F.lit(c) for c in chain.from_iterable(d.items())])
    
    """
    Mapper has the value Column<'map(A, 1, B, 2, C, 3, D, 4)'>
    """
    
    (df.withColumn("col", F.split(F.col("col"), ",")) # Split string to create an array
      .withColumn("mapper", mapper) # Add mapping columing to the dataframe
      .withColumn("col", F.expr("transform(col, x -> struct(mapper[x] as rank, x as col))")) # Iterate over array and look up rank from mapper
      .withColumn("col", F.array_min(F.col("col")).col) # array_min find minimum value based on the first struct field
    ).select("id", "col").show()
    
    """
    +---+----+
    | id| col|
    +---+----+
    |  1|   A|
    |  2|   C|
    |  3|   A|
    |  4|null|
    +---+----+
    """
    

    【讨论】:

    • 可以使用array_min函数代替排序获取数组的第一个元素
    【解决方案2】:

    这是另一个解决方案,结构排序为@Nithish 答案,但使用arrays_ziparray_min 代替:

    1. 从 dict 创建权重数组(按键排序)
    2. col 排序的分割结果压缩权重数组
    3. 获取结构体压缩数组的最小值
    import pyspark.sql.functions as F
    
    df = spark.createDataFrame([(1, "A,B,C"), (2, "D,C"), (3, "B,C,A"), (4, None)], ["id", "col"])
    d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
    
    result = df.withColumn(
        "col",
        F.array_min(
            F.arrays_zip(
                F.array(*[F.lit(d[x]) for x in sorted(d)]), 
                F.array_sort(F.split("col", ","))
            )
        )["1"]
    )
    
    result.show()
    #+---+----+
    #| id| col|
    #+---+----+
    #|  1|   A|
    #|  2|   C|
    #|  3|   A|
    #|  4|null|
    #+---+----+
    

    【讨论】:

      【解决方案3】:

      我假设您想根据字典 d 中给出的值对字母进行排序。

      然后,您可以执行以下操作:

      from pyspark.sql import Row
      from pyspark.sql import SparkSession
      import pyspark.sql.functions as F
      import pyspark.sql.types as T
      
      spark = SparkSession.builder.master("local").appName("sort_column_test").getOrCreate()
      
      df = spark.createDataFrame(data=(Row(1, "A,B,C",),
                                       Row(2, "D,C",),
                                       Row(3, "B,C,A",),
                                       Row(4, None)),
                                 schema="id:int, col:string")
      d = {'A': 1, 'B': 2, 'C': 3, 'D': 4}
      
      # Define a sort UDF that sorts the array according to the dictionary 'd', also handles None arrays
      sort_udf = F.udf(lambda array: sorted(array,
                                            key=lambda x: d[x]) if array is not None else None,
                       T.ArrayType(T.StringType()))
      df = df.withColumn("col", sort_udf(F.split(F.col("col"), ",")).getItem(0))
      df.show()
      
      """
      +---+----+
      | id| col|
      +---+----+
      |  1|   A|
      |  2|   C|
      |  3|   A|
      |  4|null|
      +---+----+
      """
      
      
      
      
      
      

      【讨论】:

        猜你喜欢
        • 2021-12-29
        • 2014-07-13
        • 1970-01-01
        • 2015-04-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多