【问题标题】:How to read a column from Pyspark RDD and apply UDF on it?如何从 Pyspark RDD 中读取一列并在其上应用 UDF?
【发布时间】:2020-06-05 23:37:24
【问题描述】:

我通过在 Pyspark 中读取 csv 文件然后转换为 RDD 以应用 UDF 来创建 DF。应用 UDF 时会引发错误。

这是我的代码 sn-p -

# My UDF definition
def my_udf(string_array):
    // some code //
    return float_var

spark.udf.register("my_udf", my_udf, FloatType())

#Read from csv file
read_data=spark.read.format("csv").load("/path/to/file/part-*.csv", header="true")

rdd = read_data.rdd

get_df = rdd.map(lambda x: (x[0], x[1], my_udf(x[2]))).toDF(["col1", "col2","col3"])

read_data DF 中的样本数据 -

[Row(Id='ABCD505936', some_string='XCOYNZGAE', array='[0, 2, 5, 6, 8, 10, 12, 13, 14, 15]')]

通过读取 CSV 文件创建的 DF 的架构 -

print (read_data.schema)
StructType(List(StructField(col1,StringType,true),StructField(col2,StringType,true),StructField(col3,StringType,true)))

在 get_df 行应用 UDF 时出现以下错误 -

Traceback(最近一次调用最后一次):文件“”,第 1 行,in 文件“/usr/lib/spark/python/pyspark/sql/session.py”,行 58、在toDF中 返回 sparkSession.createDataFrame(self, schema, sampleRatio) 文件“/usr/lib/spark/python/pyspark/sql/session.py”,第 746 行,在 创建数据帧 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) 文件 "/usr/lib/spark/python/pyspark/sql/session.py", 第 390 行,在 _createFromRDD 中 struct = self._inferSchema(rdd, samplingRatio, names=schema) 文件“/usr/lib/spark/python/pyspark/sql/session.py”,第 377 行,在 _inferSchema raise ValueError("有些类型不能由 " ValueError 确定:有些类型不能由前 100 行确定, 请用采样重试

谁能帮我将数组(数据类型为字符串)传递给 UDF?

【问题讨论】:

  • 为什么使用 RDD 而不是数据框?
  • 嗨@cricket_007,我承认我是 Pyspark 的新手,有人建议我在应用 UDF 时应该将 DF 转换为 RDD,以便我们可以使用 map 使执行并行而不是串行使用 DF。如果这种理解不正确,请纠正我,这对我很有帮助。
  • UDF 是一个 SparkSQL 概念,只能在 DataFrames 中使用。在 RDD 术语中,它们只是方法。
  • 嗨@cricket_007,所以没有什么比使用RDD的“map”功能并行执行UDF(方法)更好的了。
  • map() 是数据帧和 RDD 的一种方法。两者并行运行。为了使用 UDF,您需要通过 SparkSQL API - docs.databricks.com/spark/latest/spark-sql/udf-python.html

标签: apache-spark pyspark


【解决方案1】:

两件事:

  1. 如果将 DF 转换为 RDD,则无需将 my_udf 注册为 udf。注册udf的话直接申请df就好了read_data.withColumn("col3", my_udf(F.col("col3")))

  2. 您遇到的问题是在 toDF 步骤,从 RDD 转换时您没有指定新 DF 的架构,并且 spark 试图从示例数据中推断类型,但在您的情况下,隐式类型提示是不工作。我将手动创建架构并像这样传递到 toDF

from pyspark.sql.types import StringType, FloatType, StructField, StructType
get_schema = StructType(
[StructField('col1', StringType(), True),
 StructField('col2', StringType(), True)
 StructField('col3', FloatType(), True)]
)
get_df = rdd.map(lambda x: (x[0], x[1], my_udf(x[2]))).toDF(get_schema)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-05-21
    • 1970-01-01
    • 2021-02-17
    • 2019-12-27
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多