【发布时间】:2020-06-05 23:37:24
【问题描述】:
我通过在 Pyspark 中读取 csv 文件然后转换为 RDD 以应用 UDF 来创建 DF。应用 UDF 时会引发错误。
这是我的代码 sn-p -
# My UDF definition
def my_udf(string_array):
// some code //
return float_var
spark.udf.register("my_udf", my_udf, FloatType())
#Read from csv file
read_data=spark.read.format("csv").load("/path/to/file/part-*.csv", header="true")
rdd = read_data.rdd
get_df = rdd.map(lambda x: (x[0], x[1], my_udf(x[2]))).toDF(["col1", "col2","col3"])
read_data DF 中的样本数据 -
[Row(Id='ABCD505936', some_string='XCOYNZGAE', array='[0, 2, 5, 6, 8, 10, 12, 13, 14, 15]')]
通过读取 CSV 文件创建的 DF 的架构 -
print (read_data.schema)
StructType(List(StructField(col1,StringType,true),StructField(col2,StringType,true),StructField(col3,StringType,true)))
在 get_df 行应用 UDF 时出现以下错误 -
Traceback(最近一次调用最后一次):文件“”,第 1 行,in 文件“/usr/lib/spark/python/pyspark/sql/session.py”,行 58、在toDF中 返回 sparkSession.createDataFrame(self, schema, sampleRatio) 文件“/usr/lib/spark/python/pyspark/sql/session.py”,第 746 行,在 创建数据帧 rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio) 文件 "/usr/lib/spark/python/pyspark/sql/session.py", 第 390 行,在 _createFromRDD 中 struct = self._inferSchema(rdd, samplingRatio, names=schema) 文件“/usr/lib/spark/python/pyspark/sql/session.py”,第 377 行,在 _inferSchema raise ValueError("有些类型不能由 " ValueError 确定:有些类型不能由前 100 行确定, 请用采样重试
谁能帮我将数组(数据类型为字符串)传递给 UDF?
【问题讨论】:
-
为什么使用 RDD 而不是数据框?
-
嗨@cricket_007,我承认我是 Pyspark 的新手,有人建议我在应用 UDF 时应该将 DF 转换为 RDD,以便我们可以使用 map 使执行并行而不是串行使用 DF。如果这种理解不正确,请纠正我,这对我很有帮助。
-
UDF 是一个 SparkSQL 概念,只能在 DataFrames 中使用。在 RDD 术语中,它们只是方法。
-
嗨@cricket_007,所以没有什么比使用RDD的“map”功能并行执行UDF(方法)更好的了。
-
map()是数据帧和 RDD 的一种方法。两者并行运行。为了使用 UDF,您需要通过 SparkSQL API - docs.databricks.com/spark/latest/spark-sql/udf-python.html
标签: apache-spark pyspark