【问题标题】:What problems can arise from a Spark non-deterministic Pandas UDFSpark 非确定性 Pandas UDF 会出现什么问题
【发布时间】:2020-09-02 23:12:57
【问题描述】:

我正在编写一个流程,该流程需要为根据某些条件匹配的某些组生成 UUID。我让我的代码正常工作,但我担心在我的 UDF 中创建 UUID 的潜在问题(从而使其不确定)。下面是一些代码的简化示例来说明:

from uuid import uuid1

from pyspark.sql import SparkSession
from pyspark.sql.functions import PandasUDFType, pandas_udf

spark = (
    SparkSession.builder.master("local")
    .appName("Word Count")
    .config("spark.some.config.option", "some-value")
    .getOrCreate()
)
df = spark.createDataFrame([["j", 3], ["h", 3], ["a", 2]], ["name", "age"])


@pandas_udf("name string, age integer, uuid string", PandasUDFType.GROUPED_MAP)
def create_uuid(df):
    df["uuid"] = str(uuid1())
    return df


>>> df.groupby("age").apply(create_uuid).show()
+----+---+--------------------+
|name|age|                uuid|
+----+---+--------------------+
|   j|  3|1f8f48ac-0da8-430...|
|   h|  3|1f8f48ac-0da8-430...|
|   a|  2|d5206d03-bcce-445...|
+----+---+--------------------+

这目前适用于在 AWS Glue 上处理超过 20 万条记录的一些数据,我还没有发现任何错误。

我使用uuid1,因为它使用节点信息来生成 UUID,从而确保没有 2 个节点生成相同的 id。

我的一个想法是将 UDF 注册为非确定性:

udf = pandas_udf(
    create_uuid, "name string, age integer, uuid string", PandasUDFType.GROUPED_MAP
).asNondeterministic()

但这给了我以下错误:

py4j.protocol.Py4JJavaError: An error occurred while calling o60.flatMapGroupsInPandas.
: org.apache.spark.sql.AnalysisException: nondeterministic expressions are only allowed in
Project, Filter, Aggregate or Window, found:
 `age`,create_uuid(name, age),`name`,`age`,`uuid`
in operator FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
               ;;
FlatMapGroupsInPandas [age#1L], create_uuid(name#0, age#1L), [name#7, age#8, uuid#9]
+- Project [age#1L, name#0, age#1L]
   +- LogicalRDD [name#0, age#1L], false

我的问题是:

  • 这可能会遇到哪些潜在问题?
  • 如果它确实存在潜在问题,我可以在哪些方面做出确定性?
  • 为什么不能将 GROUPED_MAP 函数标记为非确定性函数?

【问题讨论】:

  • 顺便说一句,我已经阅读了stackoverflow.com/questions/42960920/…,但这只是解释了 1 个潜在问题。寻找其他东西。
  • 可以肯定的是,您的 pandas_udf 想要将 UUID 添加到原始 df 的所有行,并保留其余列。唯一的问题是 UUID 以某些值为条件。我还好吗?
  • @Alfilercio 在大多数情况下是的。我不一定需要返回每一列,我真的只需要 row_id (不在上面的例子中)和matched_id (上面例子中的uuid)。顺便说一句,比较逻辑相当复杂,所以它可能必须是 pandas_udf。
  • 可能最潜在的问题是 uuid 的不确定性生成,正如您在堆栈溢出帖子中已经阅读的那样。在数据操作中真的不是很好,以至于您的 id 在数据处理过程中更改了太多时间。顺便说一下,在最新版本的 spark 中,您可以检查数据帧以避免此类错误。
  • @gccodec 很酷,是的,这就是我的想法...我尝试了一些方法来在 UDF 中生成确定性 ID,然后用 UUID 替换它们,但这让它变得非常慢并且仍然有一些问题。在我们的例子中,我们生成组 UUID,然后保存结果。我们不会在 UDF 输出和存储结果之间执行任何操作,所以我认为我们会没事的。我们将确保密切关注以确保没有记录重复或发生其他意外行为。

标签: python pandas apache-spark pyspark apache-spark-sql


【解决方案1】:

您的函数是非确定性的,但 Spark 将其视为确定性,即 "Due to optimization, duplicate invocations maybe eliminated"。但是,对pandas_udf 的每次调用都将是一个唯一的输入(按键分组的行),因此不会触发对pandas_udf 的重复调用的优化。因此,抑制此类优化的asNondeterministic 方法对于GROUPED_MAP 类型的pandas_udf 是多余的。在我看来,这解释了为什么 GroupedData.apply 函数没有被编码为接受标记为非确定性的 pandas_udf。这是没有意义的,因为没有可以抑制的优化机会。

【讨论】:

  • 感谢您的回复...但是问题 1:我指的是它是非确定性的,而不是 uuid 选择。 uuid1 使用 MAC 地址、当前时间和随机序列生成;尽管我认为可能会发生碰撞,但我并不太担心,因为这不太可能发生。问题2:语法无关紧要,无论你如何装饰它,装饰函数都是一样的。您的代码仍然给出完全相同的错误。问题 3:这与您对 #2 的回答相矛盾,并没有真正解释原因。我已经知道它不适用于 Pandas UDF,但想了解原因。
  • 抱歉,我以为我已经成功了,但我错了。现在尝试它不像你说的那样工作:(
  • 不用担心,感谢您的回答!它当然解释了为什么GROUPED_MAP UDF 不能被标记为非确定性的。基于此,似乎唯一可能出现的问题是每次操作后具有不同的值。因为,在我们的例子中,我们会立即存储结果,这应该不是问题。我们必须注意的一件事是我们在 UDF 中记录的内容(记录生成的 UUID 可能毫无意义,因为它可能与实际存储的内容不匹配)。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-08-28
  • 1970-01-01
  • 2020-10-04
  • 2013-07-04
相关资源
最近更新 更多