【问题标题】:Sum of variable number of columns in PySparkPySpark 中可变列数的总和
【发布时间】:2018-08-08 18:52:19
【问题描述】:

我有一个像这样的 Spark DataFrame:

+-----+--------+-------+-------+-------+-------+-------+
| Type|Criteria|Value#1|Value#2|Value#3|Value#4|Value#5|
+-----+--------+-------+-------+-------+-------+-------+
|  Cat|       1|      1|      2|      3|      4|      5|
|  Dog|       2|      1|      2|      3|      4|      5|
|Mouse|       4|      1|      2|      3|      4|      5|
|  Fox|       5|      1|      2|      3|      4|      5|
+-----+--------+-------+-------+-------+-------+-------+

你可以用下面的代码重现它:

data = [('Cat', 1, 1, 2, 3, 4, 5),
        ('Dog', 2, 1, 2, 3, 4, 5),
        ('Mouse', 4, 1, 2, 3, 4, 5),
        ('Fox', 5, 1, 2, 3, 4, 5)]
columns = ['Type', 'Criteria', 'Value#1', 'Value#2', 'Value#3', 'Value#4', 'Value#5']
df = spark.createDataFrame(data, schema=columns)
df.show()

我的任务是添加 Total 列,该列是所有 Value 列的总和,其中 # 不超过该行的 Criteria。

在这个例子中:

  • 对于行'Cat':条件是1,所以Total 就是Value#1
  • 对于行'Dog':条件是2,所以TotalValue#1Value#2 的总和。
  • 对于行'Fox':条件是5,所以Total 是所有列的总和(Value#1Value#5)。

结果应该是这样的:

+-----+--------+-------+-------+-------+-------+-------+-----+
| Type|Criteria|Value#1|Value#2|Value#3|Value#4|Value#5|Total|
+-----+--------+-------+-------+-------+-------+-------+-----+
|  Cat|       1|      1|      2|      3|      4|      5|    1|
|  Dog|       2|      1|      2|      3|      4|      5|    3|
|Mouse|       4|      1|      2|      3|      4|      5|   10|
|  Fox|       5|      1|      2|      3|      4|      5|   15|
+-----+--------+-------+-------+-------+-------+-------+-----+

我可以使用 Python UDF 做到这一点,但是我的数据集很大,而且 Python UDF 由于序列化而速度很慢。我正在寻找纯 Spark 解决方案。

我正在使用 PySpark 和 Spark 2.1

【问题讨论】:

  • 在没有 udf 的情况下完成此操作的一种方法是将 Values 列转换为数组,然后使用 posexplode 分解数组,过滤分解值

标签: python apache-spark pyspark apache-spark-sql


【解决方案1】:

您可以通过user6910411轻松将解决方案调整为PySpark: compute row maximum of the subset of columns and add to an exisiting dataframe

from pyspark.sql.functions import col, when

total = sum([
    when(col("Criteria") >= i, col("Value#{}".format(i))).otherwise(0)
    for i in range(1, 6)
])

df.withColumn("total", total).show()

# +-----+--------+-------+-------+-------+-------+-------+-----+
# | Type|Criteria|Value#1|Value#2|Value#3|Value#4|Value#5|total|
# +-----+--------+-------+-------+-------+-------+-------+-----+
# |  Cat|       1|      1|      2|      3|      4|      5|    1|
# |  Dog|       2|      1|      2|      3|      4|      5|    3|
# |Mouse|       4|      1|      2|      3|      4|      5|   10|
# |  Fox|       5|      1|      2|      3|      4|      5|   15|
# +-----+--------+-------+-------+-------+-------+-------+-----+

为任意一组顺序列定义一个list

cols = df.columns[2:]

并将总计重新定义为:

total_ = sum([
    when(col("Criteria") > i, col(cols[i])).otherwise(0)
    for i in range(len(cols))
])

df.withColumn("total", total_).show()
# +-----+--------+-------+-------+-------+-------+-------+-----+
# | Type|Criteria|Value#1|Value#2|Value#3|Value#4|Value#5|total|
# +-----+--------+-------+-------+-------+-------+-------+-----+
# |  Cat|       1|      1|      2|      3|      4|      5|    1|
# |  Dog|       2|      1|      2|      3|      4|      5|    3|
# |Mouse|       4|      1|      2|      3|      4|      5|   10|
# |  Fox|       5|      1|      2|      3|      4|      5|   15|
# +-----+--------+-------+-------+-------+-------+-------+-----+

重要

这里sum__builtin__.sum 而不是pyspark.sql.functions.sum

【讨论】:

  • @Sergei 没有。 sum 只是减少了集合调用 __add__ 方法。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2020-12-11
  • 1970-01-01
  • 2018-10-23
  • 2022-10-24
  • 2015-11-04
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多