【问题标题】:row wise calculation on Spark [duplicate]Spark上的逐行计算[重复]
【发布时间】:2018-11-12 23:23:57
【问题描述】:

基于此answer 我需要做一些逐行计算

result= (reduce(add, (<some row wise calculation on col(x)> for x in df.columns[1:])) / n).alias("result")

但在此之前我需要按降序对行值进行排序(更改每行数据框中的列顺序?) 假设我有以下行

 3,7,21,9
 5,15,10,2

例如,我需要知道每一行的每个值的排名(顺序),然​​后计算总和(值/索引) 第一行

21 ->4,9->3,7->3,3->1,sum(21/4,9/3,7/3,3/1)

第二行

15->4,10->3,5->2,2->1,sum(15/4,10/4,5/2,2/1)

不是重复的,因为我需要排序不是按列而是按行

【问题讨论】:

  • @AndreaCorbellini 但我不需要根据列名而是根据每一行
  • 为什么在需要平均和聚合时需要排序?如果包含样本输入数据和预期输出会更清楚
  • @RameshMaharjan 虽然我确实需要聚合,但我的问题中没有任何内容暗示我需要平均值。我需要进行一些逐行计算,然后进行聚合
  • 不知道按每一行排序是什么意思。正如 Ramesh 建议的,请举个例子
  • @Yakov,为了让你的工作更轻松,我尝试了以下方法:)

标签: python apache-spark


【解决方案1】:

假设您的输入数据框如下

+----+----+----+----+
|col1|col2|col3|col4|
+----+----+----+----+
|3   |7   |21  |9   |
|5   |15  |10  |2   |
+----+----+----+----+

然后你可以写一个udf函数来得到你想要的输出列

from pyspark.sql import functions as f
from pyspark.sql import types as t
def sortAndIndex(list):
    return sorted([(value, index+1) for index, value in enumerate(sorted(list))],  reverse=True)

sortAndIndexUdf = f.udf(sortAndIndex, t.ArrayType(t.StructType([t.StructField('key', t.IntegerType(), True), t.StructField('value', t.IntegerType(), True)])))

df.withColumn('sortedAndIndexed', sortAndIndexUdf(f.array([x for x in df.columns])))

这应该给你

+----+----+----+----+----------------------------------+
|col1|col2|col3|col4|sortedAndIndexed                  |
+----+----+----+----+----------------------------------+
|3   |7   |21  |9   |[[21, 4], [9, 3], [7, 2], [3, 1]] |
|5   |15  |10  |2   |[[15, 4], [10, 3], [5, 2], [2, 1]]|
+----+----+----+----+----------------------------------+

更新

你评论为

我的计算应该是 sum(value/index) 所以可能使用你的 udf 函数我应该返回某种 reduce(add,)?

你可以这样做

from pyspark.sql import functions as f
from pyspark.sql import types as t
def divideAndSum(list):
    return sum([float(value)/(index+1) for index, value in enumerate(sorted(list))])

divideAndSumUdf = f.udf(divideAndSum, t.DoubleType())

df.withColumn('divideAndSum', divideAndSumUdf(f.array([x for x in df.columns])))

这应该给你

+----+----+----+----+------------------+
|col1|col2|col3|col4|divideAndSum      |
+----+----+----+----+------------------+
|3   |7   |21  |9   |14.75             |
|5   |15  |10  |2   |11.583333333333334|
+----+----+----+----+------------------+

【讨论】:

  • 谢谢,但假设我的计算应该是 sum(value/index) 所以可能使用你的 udf 函数我应该返回某种 reduce(add,)?
  • 最后,结果证明无论如何都是聚合。 ;) 是的,你是对的
  • 能否更新您的答案
  • @Yakov,我已经更新了答案
  • 是的,它需要用于排序和索引
猜你喜欢
  • 2019-01-29
  • 2021-05-08
  • 1970-01-01
  • 2021-12-05
  • 1970-01-01
  • 2020-10-27
  • 1970-01-01
  • 2013-04-08
  • 1970-01-01
相关资源
最近更新 更多