【问题标题】:TypeError: Column is not iterable - How to iterate over ArrayType()?TypeError:列不可迭代 - 如何迭代 ArrayType()?
【发布时间】:2018-08-06 04:49:07
【问题描述】:

考虑以下DataFrame:

+------+-----------------------+
|type  |names                  |
+------+-----------------------+
|person|[john, sam, jane]      |
|pet   |[whiskers, rover, fido]|
+------+-----------------------+

可以使用以下代码创建:

import pyspark.sql.functions as f
data = [
    ('person', ['john', 'sam', 'jane']),
    ('pet', ['whiskers', 'rover', 'fido'])
]

df = sqlCtx.createDataFrame(data, ["type", "names"])
df.show(truncate=False)

有没有办法通过对每个元素应用函数来直接修改ArrayType()"names",而不使用udf

例如,假设我想将函数foo 应用于"names" 列。 (我将使用foostr.upper 的示例仅用于说明目的,但我的问题是关于可应用于可迭代元素的任何有效函数。)

foo = lambda x: x.upper()  # defining it as str.upper as an example
df.withColumn('X', [foo(x) for x in f.col("names")]).show()

TypeError: 列不可迭代

我可以使用udf

foo_udf = f.udf(lambda row: [foo(x) for x in row], ArrayType(StringType()))
df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+

在这个具体的例子中,我可以通过爆炸列来避免udf,调用pyspark.sql.functions.upper(),然后调用groupBycollect_list

df.select('type', f.explode('names').alias('name'))\
    .withColumn('name', f.upper(f.col('name')))\
    .groupBy('type')\
    .agg(f.collect_list('name').alias('names'))\
    .show(truncate=False)
#+------+-----------------------+
#|type  |names                  |
#+------+-----------------------+
#|person|[JOHN, SAM, JANE]      |
#|pet   |[WHISKERS, ROVER, FIDO]|
#+------+-----------------------+

但这需要很多代码来做一些简单的事情。有没有更直接的方法来使用 spark-dataframe 函数迭代 ArrayType() 的元素?

【问题讨论】:

    标签: apache-spark pyspark spark-dataframe pyspark-sql


    【解决方案1】:

    Spark 中,您可以使用用户定义的函数:

    from pyspark.sql.functions import udf
    from pyspark.sql.types import ArrayType, DataType, StringType
    
    def transform(f, t=StringType()):
        if not isinstance(t, DataType):
           raise TypeError("Invalid type {}".format(type(t)))
        @udf(ArrayType(t))
        def _(xs):
            if xs is not None:
                return [f(x) for x in xs]
        return _
    
    foo_udf = transform(str.upper)
    
    df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
    
    +------+-----------------------+
    |type  |names                  |
    +------+-----------------------+
    |person|[JOHN, SAM, JANE]      |
    |pet   |[WHISKERS, ROVER, FIDO]|
    +------+-----------------------+
    

    考虑到 explode + collect_list 成语的高成本,这种方法几乎完全是首选,尽管它的内在成本。

    Spark 2.4 或更高版本中,您可以将transform* 与upper 一起使用(参见SPARK-23909):

    from pyspark.sql.functions import expr
    
    df.withColumn(
        'names', expr('transform(names, x -> upper(x))')
    ).show(truncate=False)
    
    +------+-----------------------+
    |type  |names                  |
    +------+-----------------------+
    |person|[JOHN, SAM, JANE]      |
    |pet   |[WHISKERS, ROVER, FIDO]|
    +------+-----------------------+
    

    也可以使用pandas_udf

    from pyspark.sql.functions import pandas_udf, PandasUDFType
    
    def transform_pandas(f, t=StringType()):
        if not isinstance(t, DataType):
           raise TypeError("Invalid type {}".format(type(t)))
        @pandas_udf(ArrayType(t), PandasUDFType.SCALAR)
        def _(xs):
            return xs.apply(lambda xs: [f(x) for x in xs] if xs is not None else xs)
        return _
    
    foo_udf_pandas = transform_pandas(str.upper)
    
    df.withColumn('names', foo_udf(f.col('names'))).show(truncate=False)
    
    +------+-----------------------+
    |type  |names                  |
    +------+-----------------------+
    |person|[JOHN, SAM, JANE]      |
    |pet   |[WHISKERS, ROVER, FIDO]|
    +------+-----------------------+
    

    尽管只有最新的 Arrow / PySpark 组合支持处理ArrayType 列(SPARK-24259SPARK-21187)。尽管如此,在支持任意 Python 函数的同时,此选项应该比标准 UDF 更有效(尤其是具有较低的 serde 开销)。


    * A number of other higher order functions are also supported,包括但不限于filteraggregate。见例子

    【讨论】:

    • @pault 感谢您的编辑。 Spark's own SQL docs 可能是一个更好的目标(虽然没有按类别分组) - 你怎么看?
    【解决方案2】:

    是的,您可以将其转换为 RDD,然后再转换回 DF。

    >>> df.show(truncate=False)
    +------+-----------------------+
    |type  |names                  |
    +------+-----------------------+
    |person|[john, sam, jane]      |
    |pet   |[whiskers, rover, fido]|
    +------+-----------------------+
    
    >>> df.rdd.mapValues(lambda x: [y.upper() for y in x]).toDF(["type","names"]).show(truncate=False)
    +------+-----------------------+
    |type  |names                  |
    +------+-----------------------+
    |person|[JOHN, SAM, JANE]      |
    |pet   |[WHISKERS, ROVER, FIDO]|
    +------+-----------------------+
    

    【讨论】:

    • 感谢您的回复。我也知道这种方法,但我一直在寻找仅使用 spark-dataframe 语法的东西。你知道 rdd 和 back 的序列化与使用 udf 相比如何吗?我的理解是首选使用 udf,但我没有文档支持。
    • 据我所知,一旦数据进入 Python,Spark 就无法管理 worker 的内存。 JVM 和 Python 都在一台机器上竞争内存,从而创建资源约束,这可能导致工作进程失败。
    猜你喜欢
    • 2021-10-25
    • 2019-04-01
    • 2021-08-31
    • 2018-02-10
    • 2021-11-05
    • 2013-09-01
    • 2020-09-07
    • 1970-01-01
    • 2017-08-27
    相关资源
    最近更新 更多