【发布时间】:2020-09-02 23:12:57
【问题描述】:
我正在编写一个流程,该流程需要为根据某些条件匹配的某些组生成 UUID。我让我的代码正常工作,但我担心在我的 UDF 中创建 UUID 的潜在问题(从而使其不确定)。下面是一些代码的简化示例来说明:
from uuid import uuid1
from pyspark.sql import SparkSession
from pyspark.sql.functions import PandasUDFType, pandas_udf
spark = (
SparkSession.builder.master("local")
.appName("Word Count")
.config("spark.some.config.option", "some-value")
.getOrCreate()
)
df = spark.createDataFrame([["j", 3], ["h", 3], ["a", 2]], ["name", "age"])
@pandas_udf("name string, age integer, uuid string", PandasUDFType.GROUPED_MAP)
def create_uuid(df):
df["uuid"] = str(uuid1())
return df
>>> df.groupby("age").apply(create_uuid).show()
+----+---+--------------------+
|name|age| uuid|
+----+---+--------------------+
| j| 3|1f8f48ac-0da8-430...|
| h| 3|1f8f48ac-0da8-430...|
| a| 2|d5206d03-bcce-445...|
+----+---+--------------------+
这目前适用于在 AWS Glue 上处理超过 20 万条记录的一些数据,我还没有发现任何错误。
我使用uuid1,因为它使用节点信息来生成 UUID,从而确保没有 2 个节点生成相同的 id。
我的一个想法是将 UDF 注册为非确定性:
udf = pandas_udf(
create_uuid, "name string, age integer, uuid string", PandasUDFType.GROUPED_MAP
).asNondeterministic()
但这给了我以下错误:
py4j.protocol.Py4JJavaError: An error occurred while calling o60.flatMapGroupsInPandas.
: org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window, found:
`age`,create_uuid(name, age),`name`,`age`,`uuid`
in operator FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
;;
FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
+- Project [age#1L, name#0, age#1L]
+- LogicalRDD [name#0, age#1L], false
我的问题是:
- 这可能会遇到哪些潜在问题?
- 如果它确实存在潜在问题,我可以在哪些方面做出确定性?
- 为什么不能将 GROUPED_MAP 函数标记为非确定性函数?
【问题讨论】:
-
顺便说一句,我已经阅读了stackoverflow.com/questions/42960920/…,但这只是解释了 1 个潜在问题。寻找其他东西。
-
可以肯定的是,您的 pandas_udf 想要将 UUID 添加到原始 df 的所有行,并保留其余列。唯一的问题是 UUID 以某些值为条件。我还好吗?
-
@Alfilercio 在大多数情况下是的。我不一定需要返回每一列,我真的只需要 row_id (不在上面的例子中)和matched_id (上面例子中的uuid)。顺便说一句,比较逻辑相当复杂,所以它可能必须是 pandas_udf。
-
可能最潜在的问题是 uuid 的不确定性生成,正如您在堆栈溢出帖子中已经阅读的那样。在数据操作中真的不是很好,以至于您的 id 在数据处理过程中更改了太多时间。顺便说一下,在最新版本的 spark 中,您可以检查数据帧以避免此类错误。
-
@gccodec 很酷,是的,这就是我的想法...我尝试了一些方法来在 UDF 中生成确定性 ID,然后用 UUID 替换它们,但这让它变得非常慢并且仍然有一些问题。在我们的例子中,我们生成组 UUID,然后保存结果。我们不会在 UDF 输出和存储结果之间执行任何操作,所以我认为我们会没事的。我们将确保密切关注以确保没有记录重复或发生其他意外行为。
标签: python pandas apache-spark pyspark apache-spark-sql