【问题标题】:Apache Spark: Percentile of list of row values in dataframeApache Spark:数据框中行值列表的百分比
【发布时间】:2017-10-03 00:15:28
【问题描述】:

我有一个带有一组计算列的 Apache Spark 数据框。对于数据框中的每一行(大约 2000 行),我希望获取 10 列的行值并找到第 11 列相对于其他 10 列的最接近的值。

我想我会采用这些行值并将其转换为列表,然后使用 abs 值计算来确定最接近的值。

但我被困在如何将行值转换为列表的问题上。我已经取出一列并使用 collect_list 将这些值转换为列表,但不确定当列表来自单行和多列时如何处理。

【问题讨论】:

    标签: list dataframe pyspark


    【解决方案1】:

    你应该explode你的列,这样你就可以线性化你的计算。

    让我们创建一个示例数据框:

    import numpy as np
    np.random.seed(0)
    df = sc.parallelize([np.random.randint(0, 10, 11).tolist() for _ in range(20)])\
        .toDF(["col" + str(i) for i in range(1, 12)])
    df.show()
        +----+----+----+----+----+----+----+----+----+-----+-----+
        |col1|col2|col3|col4|col5|col6|col7|col8|col9|col10|col11|
        +----+----+----+----+----+----+----+----+----+-----+-----+
        |   5|   0|   3|   3|   7|   9|   3|   5|   2|    4|    7|
        |   6|   8|   8|   1|   6|   7|   7|   8|   1|    5|    9|
        |   8|   9|   4|   3|   0|   3|   5|   0|   2|    3|    8|
        |   1|   3|   3|   3|   7|   0|   1|   9|   9|    0|    4|
        |   7|   3|   2|   7|   2|   0|   0|   4|   5|    5|    6|
        |   8|   4|   1|   4|   9|   8|   1|   1|   7|    9|    9|
        |   3|   6|   7|   2|   0|   3|   5|   9|   4|    4|    6|
        |   4|   4|   3|   4|   4|   8|   4|   3|   7|    5|    5|
        |   0|   1|   5|   9|   3|   0|   5|   0|   1|    2|    4|
        |   2|   0|   3|   2|   0|   7|   5|   9|   0|    2|    7|
        |   2|   9|   2|   3|   3|   2|   3|   4|   1|    2|    9|
        |   1|   4|   6|   8|   2|   3|   0|   0|   6|    0|    6|
        |   3|   3|   8|   8|   8|   2|   3|   2|   0|    8|    8|
        |   3|   8|   2|   8|   4|   3|   0|   4|   3|    6|    9|
        |   8|   0|   8|   5|   9|   0|   9|   6|   5|    3|    1|
        |   8|   0|   4|   9|   6|   5|   7|   8|   8|    9|    2|
        |   8|   6|   6|   9|   1|   6|   8|   8|   3|    2|    3|
        |   6|   3|   6|   5|   7|   0|   8|   4|   6|    5|    8|
        |   2|   3|   9|   7|   5|   3|   4|   5|   3|    3|    7|
        |   9|   9|   9|   7|   3|   2|   3|   9|   7|    7|    5|
        +----+----+----+----+----+----+----+----+----+-----+-----+
    

    有几种方法可以将行值转换为列表:

    • 创建一个map,其键等于列名,值等于相应的行值。

      import pyspark.sql.functions as psf
      from itertools import chain
      df = df\
          .withColumn("id", psf.monotonically_increasing_id())\
          .select(
              "id", 
              psf.posexplode(
                  psf.create_map(list(chain(*[(psf.lit(c), psf.col(c)) for c in df.columns if c != "col11"])))
              ).alias("pos", "col_name", "value"), "col11")
      df.show()
          +---+---+--------+-----+-----+
          | id|pos|col_name|value|col11|
          +---+---+--------+-----+-----+
          |  0|  0|    col1|    5|    7|
          |  0|  1|    col2|    0|    7|
          |  0|  2|    col3|    3|    7|
          |  0|  3|    col4|    3|    7|
          |  0|  4|    col5|    7|    7|
          |  0|  5|    col6|    9|    7|
          |  0|  6|    col7|    3|    7|
          |  0|  7|    col8|    5|    7|
          |  0|  8|    col9|    2|    7|
          |  0|  9|   col10|    4|    7|
          |  1|  0|    col1|    6|    9|
          |  1|  1|    col2|    8|    9|
          |  1|  2|    col3|    8|    9|
          |  1|  3|    col4|    1|    9|
          |  1|  4|    col5|    6|    9|
          |  1|  5|    col6|    7|    9|
          |  1|  6|    col7|    7|    9|
          |  1|  7|    col8|    8|    9|
          |  1|  8|    col9|    1|    9|
          |  1|  9|   col10|    5|    9|
          +---+---+--------+-----+-----+
      
    • ArrayType 中使用StructType

      df = df\
          .withColumn("id", psf.monotonically_increasing_id())\
          .select(
              "id", 
              psf.explode(
                  psf.array([psf.struct(psf.lit(c).alias("col_name"), psf.col(c).alias("value")) 
                             for c in df.columns if c != "col11"])).alias("cols"), 
              "col11").select("cols.*", "col11", "id")
      df.show()
          +--------+-----+-----+---+
          |col_name|value|col11| id|
          +--------+-----+-----+---+
          |    col1|    5|    7|  0|
          |    col2|    0|    7|  0|
          |    col3|    3|    7|  0|
          |    col4|    3|    7|  0|
          |    col5|    7|    7|  0|
          |    col6|    9|    7|  0|
          |    col7|    3|    7|  0|
          |    col8|    5|    7|  0|
          |    col9|    2|    7|  0|
          |   col10|    4|    7|  0|
          |    col1|    6|    9|  1|
          |    col2|    8|    9|  1|
          |    col3|    8|    9|  1|
          |    col4|    1|    9|  1|
          |    col5|    6|    9|  1|
          |    col6|    7|    9|  1|
          |    col7|    7|    9|  1|
          |    col8|    8|    9|  1|
          |    col9|    1|    9|  1|
          |   col10|    5|    9|  1|
          +--------+-----+-----+---+
      
    • 使用ArrayType...

    一旦你有一个分解列表,你可以寻找|col11 - value|的最小值:

    from pyspark.sql import Window
    w = Window.partitionBy("id").orderBy(psf.abs(psf.col("col11") - psf.col("value")))
    res = df.withColumn("rn", psf.row_number().over(w)).filter("rn = 1")
    res.sort("id").show()
        +--------+-----+-----+----------+---+
        |col_name|value|col11|        id| rn|
        +--------+-----+-----+----------+---+
        |    col5|    7|    7|         0|  1|
        |    col2|    8|    9|         1|  1|
        |    col1|    8|    8|         2|  1|
        |    col2|    3|    4|         3|  1|
        |    col1|    7|    6|         4|  1|
        |    col5|    9|    9|         5|  1|
        |    col2|    6|    6|         6|  1|
        |   col10|    5|    5|         7|  1|
        |    col3|    5|    4|         8|  1|
        |    col6|    7|    7|         9|  1|
        |    col2|    9|    9|8589934592|  1|
        |    col3|    6|    6|8589934593|  1|
        |    col3|    8|    8|8589934594|  1|
        |    col2|    8|    9|8589934595|  1|
        |    col2|    0|    1|8589934596|  1|
        |    col2|    0|    2|8589934597|  1|
        |    col9|    3|    3|8589934598|  1|
        |    col7|    8|    8|8589934599|  1|
        |    col4|    7|    7|8589934600|  1|
        |    col4|    7|    5|8589934601|  1|
        +--------+-----+-----+----------+---+
    

    【讨论】:

    • 谢谢。因此,一旦我有一列包含这些值的列表,是否可以计算该列表的百分位数?
    • 可以直接用窗函数求差值绝对值的最小值。
    猜你喜欢
    • 1970-01-01
    • 2018-01-22
    • 2023-03-31
    • 1970-01-01
    • 2022-01-20
    • 1970-01-01
    • 2012-09-16
    • 1970-01-01
    • 2019-09-29
    相关资源
    最近更新 更多