【问题标题】:Unexpected behavior of UDF for random integers with join operation具有连接操作的随机整数的 UDF 的意外行为
【发布时间】:2020-11-19 01:39:11
【问题描述】:

在编写使用加盐连接两个表的代码时(以克服倾斜数据问题),我观察到 UDF 的意外行为,它与连接一起创建随机整数列。

如果我使用 rand() 方法创建“盐”列,它会按预期工作,但是当我使用生成整数的 UDF 时,生成的连接(大多数情况下)的行数比预期的少,并且行数我的变化(取决于生成的特定随机数)。

以下代码显示了问题。

    import spark.implicits._

    val data1 = Seq(
      (1,0.11,0),
      (1,0.11,1),
      (2,0.12,0),
      (2,0.12,1)
    )

    val data2 = Seq(
      (1,0.1),
      (1,0.2),
      (1,0.3),
      (2,0.4),
      (2,0.5)
    )

    val df1 = data1.toDF("id","val","salt")
    val df2 = data2.toDF("id","val")

    val df2_salted = df2.withColumn("salt", rand() * 2).withColumn("salt", col("salt").cast(IntegerType))

    import scala.util.Random
    import org.apache.spark.sql.functions.udf
    val randUdf = udf({() => Random.nextInt(2)})
    
    val df2_salted_by_udf = df2.withColumn("salt", randUdf())

    val df_join = df1.join(df2_salted, Seq("id", "salt"), "inner")
    df_join.show()

    val df_join_by_udf = df1.join(df2_salted_by_udf, Seq("id", "salt"), "inner")
    df_join_by_udf.show()

输出(在我运行的一次执行中)看起来像这样。 df_join 是(如预期的那样):


+---+----+----+---+
| id|salt| val|val|
+---+----+----+---+
|  1|   0|0.11|0.1|
|  1|   1|0.11|0.2|
|  1|   1|0.11|0.3|
|  2|   1|0.12|0.4|
|  2|   0|0.12|0.5|
+---+----+----+---+```

but df_join_by_udf  output is:
```+---+----+----+---+
| id|salt| val|val|
+---+----+----+---+
|  1|   0|0.11|0.1|
|  1|   0|0.11|0.2|
+---+----+----+---+```

In another run I got the following for df_join_by_udf
```+---+----+----+---+
| id|salt| val|val|
+---+----+----+---+
|  2|   0|0.12|0.4|
+---+----+----+---+

等等

为什么我的 UDF 的行为与 rand() 方法不同(它给出了预期的连接结果)

【问题讨论】:

    标签: scala join apache-spark-sql user-defined-functions


    【解决方案1】:

    你使用的 udf 实际上并不是一个“真正的”函数;从某种意义上说,当应用于相同的输入(即 epsilon 值)时,它不会产生相同的输出。

    为了实现您的目标,一种可能的解决方法是强制 spark 以避免重新计算盐值。比如

    val df3 = df2_salted_by_udf.persist
    val df3_join = df1.join(df3, Seq("id", "salt"), "inner")
    df3_join.show()
    

    应该可以。以我为例,它返回

    +---+----+----+---+
    | id|salt| val|val|
    +---+----+----+---+
    |  1|   0|0.11|0.1|
    |  1|   1|0.11|0.2|
    |  1|   0|0.11|0.3|
    |  2|   1|0.12|0.4|
    |  2|   1|0.12|0.5|
    +---+----+----+---+
    

    【讨论】:

    • UDF 的输出确实会在运行之间发生变化,但无论它是什么,它都会生成一个随机值 0 或 1。我的问题是,为什么它会生成(或一个)所需的输出,但它不会'不正确地进行连接。我不是在寻找解决方法。 rand() * 2 已经完成了这项工作。我试图了解我看到的行为。
    • 函数在join操作过程中被多次调用(即使是同一行),每次都返回一个可能的不同值
    • 你说得对,问题是由于我的 UDF 是不确定的。 Spark 的优化假设 UDF 是确定性的,因此是观察到的行为。解决方法是使用udf的asNonDeterminitic()方法。
    【解决方案2】:

    在阅读了更多和一些提示后,我了解到问题与 Spark 的假设有关,即 UDF 是一个确定性函数,并结合了它在连接执行中所做的一些优化。

    有一个选项可以告诉 Spark UDF 是非确定性的(方法 [asNondeterministic]1)。

    所以,要修复我的代码,我会将其更改为

        val randUdfAsNonDeterministic = randUdf.asNondeterministic()
        val df2_salted_by_udf = df2.withColumn("salt", randUdfAsNonDeterministic())```
    
      [1]: https://spark.apache.org/docs/2.3.1/api/java/org/apache/spark/sql/expressions/UserDefinedFunction.html#asNondeterministic--
    

    【讨论】:

      猜你喜欢
      • 2018-05-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-03-01
      • 2016-08-13
      • 1970-01-01
      • 2017-02-26
      • 1970-01-01
      相关资源
      最近更新 更多