【问题标题】:Iterate over an array column in PySpark with map使用 map 遍历 PySpark 中的数组列
【发布时间】:2019-06-21 14:57:47
【问题描述】:

在 PySpark 中,我有一个由两列组成的数据框:

+-----------+----------------------+
| str1      | array_of_str         |
+-----------+----------------------+
| John      | [mango, apple, ...   |
| Tom       | [mango, orange, ...  |
| Matteo    | [apple, banana, ...  | 

我想添加一个列concat_result,其中包含array_of_str 内每个元素的连接str1 列内的字符串。

+-----------+----------------------+----------------------------------+
| str1      | array_of_str         | concat_result                    |
+-----------+----------------------+----------------------------------+
| John      | [mango, apple, ...   | [mangoJohn, appleJohn, ...       |
| Tom       | [mango, orange, ...  | [mangoTom, orangeTom, ...        |
| Matteo    | [apple, banana, ...  | [appleMatteo, bananaMatteo, ...  |

我正在尝试使用map 来遍历数组:

from pyspark.sql import functions as F
from pyspark.sql.types import StringType, ArrayType

# START EXTRACT OF CODE
ret = (df
  .select(['str1', 'array_of_str'])
  .withColumn('concat_result', F.udf(
     map(lambda x: x + F.col('str1'), F.col('array_of_str')), ArrayType(StringType))
  )
)

return ret
# END EXTRACT OF CODE

但我得到错误:

TypeError: argument 2 to map() must support iteration

【问题讨论】:

标签: python apache-spark pyspark


【解决方案1】:

您只需稍作调整即可完成这项工作:

from pyspark.sql.types import StringType, ArrayType
from pyspark.sql.functions import udf, col

concat_udf = udf(lambda con_str, arr: [x + con_str for x in arr],
                   ArrayType(StringType()))
ret = df \
  .select(['str1', 'array_of_str']) \
  .withColumn('concat_result', concat_udf(col("str1"), col("array_of_str")))

ret.show()

你不需要使用map,标准列表理解就足够了。

【讨论】:

  • 唯一需要注意的是,如果 str1array_of_str 的任何值是 null,这将中断。您必须在 udf 中添加明确的错误检查。
猜你喜欢
  • 1970-01-01
  • 2011-05-24
  • 2018-05-22
  • 1970-01-01
  • 1970-01-01
  • 2020-05-26
  • 2021-04-26
  • 2023-01-12
  • 1970-01-01
相关资源
最近更新 更多