【发布时间】:2020-11-19 01:39:11
【问题描述】:
在编写使用加盐连接两个表的代码时(以克服倾斜数据问题),我观察到 UDF 的意外行为,它与连接一起创建随机整数列。
如果我使用 rand() 方法创建“盐”列,它会按预期工作,但是当我使用生成整数的 UDF 时,生成的连接(大多数情况下)的行数比预期的少,并且行数我的变化(取决于生成的特定随机数)。
以下代码显示了问题。
import spark.implicits._
val data1 = Seq(
(1,0.11,0),
(1,0.11,1),
(2,0.12,0),
(2,0.12,1)
)
val data2 = Seq(
(1,0.1),
(1,0.2),
(1,0.3),
(2,0.4),
(2,0.5)
)
val df1 = data1.toDF("id","val","salt")
val df2 = data2.toDF("id","val")
val df2_salted = df2.withColumn("salt", rand() * 2).withColumn("salt", col("salt").cast(IntegerType))
import scala.util.Random
import org.apache.spark.sql.functions.udf
val randUdf = udf({() => Random.nextInt(2)})
val df2_salted_by_udf = df2.withColumn("salt", randUdf())
val df_join = df1.join(df2_salted, Seq("id", "salt"), "inner")
df_join.show()
val df_join_by_udf = df1.join(df2_salted_by_udf, Seq("id", "salt"), "inner")
df_join_by_udf.show()
输出(在我运行的一次执行中)看起来像这样。 df_join 是(如预期的那样):
+---+----+----+---+
| id|salt| val|val|
+---+----+----+---+
| 1| 0|0.11|0.1|
| 1| 1|0.11|0.2|
| 1| 1|0.11|0.3|
| 2| 1|0.12|0.4|
| 2| 0|0.12|0.5|
+---+----+----+---+```
but df_join_by_udf output is:
```+---+----+----+---+
| id|salt| val|val|
+---+----+----+---+
| 1| 0|0.11|0.1|
| 1| 0|0.11|0.2|
+---+----+----+---+```
In another run I got the following for df_join_by_udf
```+---+----+----+---+
| id|salt| val|val|
+---+----+----+---+
| 2| 0|0.12|0.4|
+---+----+----+---+
等等
为什么我的 UDF 的行为与 rand() 方法不同(它给出了预期的连接结果)
【问题讨论】:
标签: scala join apache-spark-sql user-defined-functions