【问题标题】:spark sql - whether to use row transformation or UDFspark sql - 是否使用行转换或 UDF
【发布时间】:2017-09-10 16:41:41
【问题描述】:

我有一个包含 100 列和 1000 万条记录的输入表 (I)。我想得到一个有 50 列的输出表 (O),这些列是从 I 的列派生的,即将有 50 个函数将 I 的列映射到 O 的 50 列,即 o1 = f( i1) , o2 = f(i2, i3) ..., o50 = f(i50, i60, i70).

在 spark sql 中,我可以通过两种方式做到这一点:

  1. 行转换,将 I 的整行逐一解析(例如:映射函数)以生成 O 行。
  2. 使用我猜想在列级别工作的 UDF,即将 I 的现有列作为输入并生成 O 的相应列之一,即使用 50 个 UDF 函数。

鉴于我正在处理整个输入表 I并生成全新的输出表 O 即批量数据处理。

【问题讨论】:

  • 您好 sunillp,正如您所描述的,我正面临行与列转换之间的确切困境。你能分享一下经验吗?你最后做了什么,最后表现如何?谢谢。

标签: sql apache-spark apache-spark-sql user-defined-functions


【解决方案1】:

我本来打算写关于 Catalyst optimizer 的全部内容,但只要注意 Jacek Laskowski 在他的书中 Mastering Apache Spark 2 中所说的内容就更简单了:

"在恢复使用您自己的自定义 UDF 函数之前,尽可能使用更高级别的标准基于列的函数和数据集运算符,因为 UDF 是 Spark 的黑盒,因此它甚至不会尝试优化它们。"

Jacek 还注意到 Spark 开发团队中某人的评论:

"在一些简单的情况下,我们可以分析 UDF 字节码并推断它在做什么,但通常很难做到。"

这就是为什么 Spark UDF 永远不应成为您的首选。

同样的观点在 Cloudera post 中得到了回应,作者指出“...使用 Apache Spark 的内置 SQL 查询函数通常会带来最佳性能,并且应该是任何时候考虑的第一种方法可以避免引入 UDF。"

但是,作者也正确地指出,随着 Spark 变得更加智能,这可能会在未来发生变化,同时,如果您不介意的话,您可以使用 Expression.genCode,如 Chris Fregly 的 talk 中所述耦合到 Catalyst 优化器。

【讨论】:

  • 我对此有些不清楚。无论我将它用作映射函数(用于基于行的转换)还是用作 UDF(用于基于列的转换),我用于转换的函数说“f(x)”(在 scala 中)都是相同的。那么在这两种情况下,spark 都会有优化问题?
  • 这不是黑白的。优化是一个频谱。 UDF 是最不可优化的。从那里您可以通过使用DataFramesColumns 来进一步优化,尽可能使用built-in library functions,利用Spark 和存储格式(如Parquet 等)之间的存储优化支持。您也不必猜测。您可以在多个场景中询问 Spark 的逻辑和物理计划,以了解哪些更有效,哪些更改影响最大。
  • 感谢生成您自己的代码的链接!我认为这是将一些现有的“优化”(从 Java 的角度)例程移植到 Spark,并在现代框架中重用“遗留”C 代码的好方法。我想知道拥有可插入的代码生成后端会有多少工作量和复杂性,因此即使是手动调整的汇编代码也可以直接生成,而不必将其包装在 C 中。当然,这是一个极端情况,并且支持优化库满足 98% 的需求。
【解决方案2】:

在简单的情况下,使用 UDF 是更好的选择,因为它不需要完整的编码和解码。 UDF 只能访问必填字段并对结果进行编码。

自 Spark 2.0 以来,它也得到了更好的支持,并且可以在某种程度上在执行计划中进行优化。

映射完整的Row 和应用标准 UDF 都无法从所有 Spark SQL 优化中受益,并且在数据分布和并行化方面没有区别。

【讨论】:

  • 那么当内置列函数无法完成相同的任务时,UDF比使用自定义逻辑的map函数的基于行的转换效率更高?
【解决方案3】:

通过使用parent dataframe 的列,Spark 拥有inbuilt functionsnew dataframe

这些函数的性能将优于使用udf 函数或使用row 转换的两个建议选择。

考虑到优化的数据分布,为最终表的 50 个必需列编写 row 转换函数将是一个真正的麻烦。

如果您的函数 (o1 = f(i1) , o2 = f(i2, i3) ..., o50 = f(i50, i60, i70)) 不能被 inbuilt functions 或它们的组合替换,那么只有我建议您使用 udf 函数,因为 udf 函数需要对数据进行序列化和反序列化。

【讨论】:

    【解决方案4】:

    用户定义函数或自定义函数可以在 Spark SQL 中定义和注册为 UDF,并具有可用于 SQL 查询的关联别名。

    UDF 对 Apache Spark SQL 的性能影响很大 (Spark SQL’s Catalyst Optimizer)

    由于我们在 Spark 中没有任何定义的规则,因此开发人员可以使用他/她的尽职调查。

    Python UDF 从不使用 UDF。补偿 Python 解释器和 JVM 之间重复序列化、反序列化和数据移动的成本是不可能的,Python UDF 导致数据在执行器 JVM 和运行 UDF 逻辑的 Python 解释器之间被序列化——与 UDF 实现相比,这显着降低了性能Java 或 Scala。

    Java、Scala UDF 实现可由执行程序 JVM 直接访问。 所以 Java 、Scala UDF 性能优于 Python UDF

    Spark SQL 函数直接在 JVM 上运行,并使用 Catalyst 和 Tungsten 进行优化。这意味着这些可以在执行计划中进行优化,并且大多数时候可以从 codgen 和其他 Tungsten 优化中受益。此外,这些可以对其“本机”表示形式的数据进行操作,因为 Spark SQL 将与 Catalyst 查询优化器一起使用。它的功能随着每个版本的发布而扩展,并且通常可以为 Spark SQL 查询提供显着的性能改进;

    结论: Catalyst 可能不太了解 UDF 实现代码,因此使用 Apache Spark 的内置 SQL 查询函数通常会带来最佳性能,并且应该是在可以避免引入 UDF 时首先考虑的方法。

    【讨论】:

    • 你有例子吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-09-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-03-15
    • 1970-01-01
    相关资源
    最近更新 更多