【发布时间】:2018-08-06 04:49:07
【问题描述】:
考虑以下DataFrame:
+------+-----------------------+
|type |names |
+------+-----------------------+
|person|[john, sam, jane] |
|pet |[whiskers, rover, fido]|
+------+-----------------------+
可以使用以下代码创建:
import pyspark.sql.functions as f
data = [
('person', ['john', 'sam', 'jane']),
('pet', ['whiskers', 'rover', 'fido'])
]
df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)
有没有办法通过对每个元素应用函数来直接修改ArrayType() 列"names",而不使用udf?
例如,假设我想将函数foo 应用于"names" 列。 (我将使用foo 为str.upper 的示例仅用于说明目的,但我的问题是关于可应用于可迭代元素的任何有效函数。)
foo = lambda x: x.upper() # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()
TypeError: 列不可迭代
我可以使用udf:
foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
在这个具体的例子中,我可以通过爆炸列来避免udf,调用pyspark.sql.functions.upper(),然后调用groupBy和collect_list:
df.select('type', f.explode('names').alias('name'))\
.withColumn('name', f.upper(f.col('name')))\
.groupBy('type')\
.agg(f.collect_list('name').alias('names'))\
.show(truncate=False)
#+------+-----------------------+
#|type |names |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE] |
#|pet |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+
但这需要很多代码来做一些简单的事情。有没有更直接的方法来使用 spark-dataframe 函数迭代 ArrayType() 的元素?
【问题讨论】:
标签: apache-spark pyspark spark-dataframe pyspark-sql