【问题标题】:How to find average of a array column based on index in pyspark如何根据pyspark中的索引查找数组列的平均值
【发布时间】:2019-06-21 00:20:17
【问题描述】:

我有如下数据

-----------------------------
place  | key        | weights
----------------------------
amazon | lion       | [ 34, 23, 56 ]
north  | bear       | [ 90, 45]
amazon | lion       | [ 38, 30, 50 ]
amazon | bear       | [ 45 ]
amazon | bear       | [ 40 ]

我试图得到如下结果

-----------------------------
place  | key        | average
----------------------------
amazon | lion1      | 36.0      #(34 + 38)/2
amazon | lion2      | 26.5      #(23 + 30)/2
amazon | lion3      | 53.0      #(50 + 56)/2
north  | bear1      | 90        #(90)/1
north  | bear2      | 45        #(45)/1
amazon | bear1      | 42.5      #(45 + 40)/2

我明白了,首先我必须对列 placekey 进行分组,然后我必须根据索引对数组元素取平均值。 例如,lion1 是数组 [ 34, 23, 56 ][ 38, 30, 50 ] 中的第一个索引元素。

我已经有一个使用posexplode的解决方案,但问题是在实际数据中weights数组列大小非常高,因为posexplode增加了更多行,数据大小从1000万行大幅增加到12亿行并且无法在当前集群上以可靠的时间进行计算。

我认为添加比行更多的列然后取消透视列更好,但我不知道如何使用 pyspark 或 spark SQL 2.2.1 来实现。

【问题讨论】:

    标签: apache-spark hive pyspark apache-spark-sql pyspark-sql


    【解决方案1】:

    一种选择是将给定地点的所有arrays,组合键合并到一个数组中。在这个数组数组中,您可以使用 udf 来计算所需的平均值,最后使用 posexplode 来获得所需的结果。

    from pyspark.sql.functions import collect_list,udf,posexplode,concat
    from pyspark.sql.types import ArrayType,DoubleType
    
    #Grouping by place,key to get an array of arrays
    grouped_df = df.groupBy(df.place,df.key).agg(collect_list(df.weights).alias('all_weights'))
    
    #Define UDF
    zip_mean = udf(lambda args: [sum(i)/len(i) for i in zip(*args)],ArrayType(DoubleType()))
    
    #Apply UDF on the array of array column
    res = grouped_df.select('*',zip_mean(grouped_df.all_weights).alias('average'))
    
    #POS explode to explode the average values and get the position for key concatenation
    res = res.select('*',posexplode(res.average))
    
    #Final result
    res.select(res.place,concat(res.key,res.pos+1).alias('key'),res.col).show()
    

    【讨论】:

    • 感谢您的解决方案,UDF 是否有转机,我尝试使用一些 hive 函数进行检查,但没有发现可以在 python 中执行 zip 函数。再次感谢您。
    • 我可以知道对此解决方案投反对票的原因吗,因为我正在考虑考虑它,因为在对两个解决方案进行基准测试后,这个解决方案在 26 分钟内运行,而另一个解决方案在 36 分钟内运行对于 4GB 数据,但是这个解决方案有更多的随机内存溢出。
    【解决方案2】:

    您可以通过functions.size() 找到数组列中的最大元素数,然后展开该列:

    1. 设置数据

      from pyspark.sql import functions as F
      
      df = spark.createDataFrame([    
            ('amazon', 'lion', [ 34, 23, 56 ])
          , ('north',  'bear', [ 90, 45])
          , ('amazon', 'lion', [ 38, 30, 50 ])
          , ('amazon', 'bear', [ 45 ])    
          , ('amazon', 'bear', [ 40 ])
      ], ['place', 'key', 'average'])
      
    2. 在数组字段'average'中查找最大元素数

      n = df.select(F.max(F.size('average')).alias('n')).first().n
      
      >>> n
      3
      
    3. 将数组列转换为 n 列

      df1 = df.select('place', 'key', *[F.col('average')[i].alias('val_{}'.format(i+1)) for i in range(n)])
      
      >>> df1.show()
      +------+----+-----+-----+-----+
      | place| key|val_1|val_2|val_3|
      +------+----+-----+-----+-----+
      |amazon|lion|   34|   23|   56|
      | north|bear|   90|   45| null|
      |amazon|lion|   38|   30|   50|
      |amazon|bear|   45| null| null|
      |amazon|bear|   40| null| null|
      +------+----+-----+-----+-----+
      
    4. 计算新列的平均聚合

      df2 = df1.groupby('place', 'key').agg(*[ F.mean('val_{}'.format(i+1)).alias('average_{}'.format(i+1)) for i in range(n)])
      
      >>> df2.show()
      +------+----+---------+---------+---------+
      | place| key|average_1|average_2|average_3|
      +------+----+---------+---------+---------+
      |amazon|bear|     42.5|     null|     null|
      | north|bear|     90.0|     45.0|     null|
      |amazon|lion|     36.0|     26.5|     53.0|
      +------+----+---------+---------+---------+
      
    5. 使用 select + union + reduce 取消透视列

      from functools import reduce
      
      df_new = reduce(lambda x,y: x.union(y), [
          df2.select('place', F.concat('key', F.lit(i+1)).alias('key'), F.col('average_{}'.format(i+1)).alias('average')) \
             .dropna(subset=['average']) for i in range(n)
      ])
      
      >>> df_new.show()
      +------+-----+-------+
      | place|  key|average|
      +------+-----+-------+
      |amazon|bear1|   42.5|
      | north|bear1|   90.0|
      |amazon|lion1|   36.0|
      | north|bear2|   45.0|
      |amazon|lion2|   26.5|
      |amazon|lion3|   53.0|
      +------+-----+-------+
      

    【讨论】:

    • 非常感谢您的解决方案,我注意到的一个问题是,如果n 的值是 252,分区数是 12,那么创建的任务数非常高,大约 54270,有 257 个阶段。我认为这是因为联合操作运行了 252 次并减慢了整个过程。有没有更好的方法来反透视数据。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-06-29
    • 1970-01-01
    • 2020-05-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多