【问题标题】:Strange behavior when using toDF() function to transfrom RDD to Dataframe in PySpark在 PySpark 中使用 toDF() 函数将 RDD 转换为 Dataframe 时的奇怪行为
【发布时间】:2018-11-01 09:50:53
【问题描述】:

我是 Spark 的新手。当我使用 toDF() 函数将 RDD 转换为数据帧时,它似乎计算了我之前编写的 map() 之类的所有转换函数。我想知道 PySpark 中的 toDF() 是转换还是动作。

我创建了一个简单的RDD并使用一个简单的函数来输出它的值,只是为了测试,并在map()之后使用toDF()。结果似乎部分地在 map 中运行该函数。当我展示数据帧的结果时,toDF() 就像转换一样,再次输出结果。

>>> a = sc.parallelize([(1,),(2,),(3,)])
>>> def f(x):
...     print(x[0])
...     return (x[0] + 1, )
...
>>> b = a.map(f).toDF(["id"])
2
1
>>> b = a.map(f).toDF(["id"]).show()
2
1
1
2
3
+---+
| id|
+---+
|  2|
|  3|
|  4|
+---+

谁能告诉我为什么 PySpark 中的 toDF() 函数既像动作又像转换?非常感谢。

PS:在 Scala 中,在我的例子中,toDF 的作用类似于转换。

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql rdd


    【解决方案1】:

    这并不奇怪。由于您没有提供模式,Spark 必须根据数据推断它。如果RDD 是输入,它将调用SparkSession._createFromRDD 和随后的SparkSession._inferSchema,如果samplingRatio 缺失,will evaluate up to 100 row

    first = rdd.first()
    if not first:
        raise ValueError("The first row in RDD is empty, "
                         "can not infer schema")
    if type(first) is dict:
        warnings.warn("Using RDD of dict to inferSchema is deprecated. "
                      "Use pyspark.sql.Row instead")
    
    
    if samplingRatio is None:
        schema = _infer_schema(first, names=names)
        if _has_nulltype(schema):
            for row in rdd.take(100)[1:]:
                schema = _merge_type(schema, _infer_schema(row, names=names))
                if not _has_nulltype(schema):
                    break
            else:
                raise ValueError("Some of types cannot be determined by the "
                                 "first 100 rows, please try again with sampling")
    

    现在剩下的唯一难题是为什么它不能准确评估一条记录。毕竟在你的情况下first 不是空的,也不包含None

    这是因为first 是通过take 实现的,并且不保证将评估的项目的确切数量。如果第一个分区没有产生所需数量的项目,它将迭代地增加要扫描的分区数。详情请查看the implementation

    如果你想避免这种情况,你应该使用createDataFrame 并提供模式作为 DDL 字符串:

    spark.createDataFrame(a.map(f), "val: integer")
    

    或等效的StructType

    您不会在 Scala 对应项中找到任何类似的行为,因为它不使用 toDF 中的模式推断。它要么从 Encoder(使用 Scala 反射获取)中检索相应的模式,要么根本不允许转换。最接近的类似行为是对输入源的推断,例如 CSV or JSON

    spark.read.json(Seq("""{"foo": "bar"}""").toDS.map(x => { println(x); x }))
    

    【讨论】:

    • 非常感谢。这个答案正是我需要的。
    猜你喜欢
    • 2023-03-13
    • 2021-06-29
    • 2018-09-14
    • 1970-01-01
    • 2016-05-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-11-02
    相关资源
    最近更新 更多