【问题标题】:PySpark flatmap should return tuples with typed valuesPySpark 平面图应返回具有类型值的元组
【发布时间】:2016-09-10 13:10:45
【问题描述】:

我正在使用带有 PySpark 的 Jupyter Notebook。在其中,我有一个数据框,该数据框具有包含这些列的列名和类型(整数,...)的模式。现在我使用 flatMap 之类的方法,但这会返回一个不再具有固定类型的元组列表。有没有办法做到这一点?

df.printSchema()
root
 |-- name: string (nullable = true)
 |-- ...
 |-- ...
 |-- ratings: integer (nullable = true)

然后我使用 flatMap 对评分值进行一些计算(此处混淆):

df.flatMap(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings))
y_rate.toDF().printSchema()

现在我得到一个错误:

TypeError:无法推断类型的架构:

有没有办法通过保留架构来使用 map/flatMap/reduce?或者至少返回具有特定类型值的元组?

【问题讨论】:

    标签: python pyspark namedtuple flatmap


    【解决方案1】:

    首先,您使用了错误的功能。 flatMapmapflatten 因此假设您的数据如下所示:

    df = sc.parallelize([("foo", 0), ("bar", 10)]).toDF(["id", "ratings"])
    

    flatMap 的输出将等同于:

    sc.parallelize(['foo', 0, 'bar', 5])
    

    因此您会看到错误。如果你真的想让它工作,你应该使用map:

    df.rdd.map(lambda row: (row.id, 5 if (row.ratings > 5) else row.ratings)).toDF()
    ## DataFrame[_1: string, _2: bigint]
    

    接下来,在 2.0 中不再支持对 DataFrame 的映射。您应该先提取rdd(参见上面的df.rdd.map)。

    最后在 Python 和 JVM 之间传递数据是极其低效的。它不仅需要在 Python 和 JVM 之间通过相应的序列化/反序列化和模式推断(如果未显式提供模式)来传递数据,这也打破了惰性。对于这样的事情,最好使用 SQL 表达式:

    from pyspark.sql.functions import when
    
    df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings))
    

    如果出于某种原因您需要纯 Python 代码,UDF 可能是更好的选择。

    【讨论】:

    • 非常有帮助。感谢您的示例代码。我只是没有得到 flatMap vs Map 的部分。
    • flatMap 是一个函数RDD[T] => (T => Iterable[U]) => RDD[U]。换句话说,它期望函数返回Itereble(Python 元组是)并将这些(展平)结果连接起来。
    • 有没有办法在该语句中为何时/否则列命名?见df.select(df.id, when(df.ratings > 5, 5).otherwise(df.ratings))@zero323
    • 是的,您可以使用alias 例如:when(df.ratings > 5, 5).otherwise(df.ratings).alias("foo")
    猜你喜欢
    • 2016-08-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-04-04
    • 2020-09-23
    • 1970-01-01
    • 1970-01-01
    • 2015-11-20
    相关资源
    最近更新 更多