【问题标题】:Identifying records present in one dataframe column but not present in another dataframe column识别存在于一个数据框列中但不存在于另一个数据框列中的记录
【发布时间】:2019-09-20 09:08:45
【问题描述】:

我正在尝试选择 foo 中存在但 bar 中不存在的所有元素。我正在使用此代码:

foo.repartition(1)
foo.cache()
bar.repartition(1)
bar.cache()
foo.select("col_1").except(bar.select("col_1"))

有没有更好或更快的方法来做到这一点?目前在集群上运行时需要超过 15 分钟以上。

附加信息: foo 将有大约 100-1000 个元素。 bar 将有 4000 万多个元素。 foo 是一个数据框,由使用 Spark SQL 从 hive 表(50 列)读取的数据组成。 bar 是一个数据框,由使用 KuduContext 从 kudu 表(250 列)读取的数据组成。

在带有 Scala 2.11.8 的 CDH 5.15.x 上使用 Spark 2.2。

【问题讨论】:

  • 好奇:你为什么用repartition(1)?特别是对于bar(更大的数据框)。您是否尝试省略重新分区?您正在强制 Spark 为这项工作使用单个执行程序,这可能会显着减慢速度
  • 我试过两个数据帧都没有repartition(1)。我在阅读其他一些帖子后添加了它,认为它可能会有所帮助。我对使用 Spark 比较陌生。另外,有没有repartition(1)也没有时差。
  • FWIW,对于 bar,来自 kudu 表的数据帧,表本身在主键(列)上定义了 hash partitioncol_1 是主键列之一。

标签: scala apache-spark dataframe apache-spark-sql


【解决方案1】:

正如@Tzach 建议的那样,最好避免使用bar.repartition(1)bar.cache(),因为条形数据集似乎太大而无法放入内存。您可以将缓存用于小数据集,甚至更好地尝试将其广播给每个执行器。此外,如果您知道大数据集的大小,您可以使用partition_num = total_size / 500MB 计算分区号,每个分区的理想大小为 250-500 MB,因此如果您的数据为 10GB,则应为 10GB/500MB = 20 个分区。

这是您在上述更改后的代码:

foo.cache() //feel free to cache the small dataset
bar.repartition(partitions_num) //this is optional

foo.select("col_1").except(bar.select("col_1"))

您也可以尝试使用left_anti 连接,如下所示并比较它们的性能:

foo.join(bar, foo("col_1") === bar("col_1"), "left_anti").show

这将排除 foo 中 col_1 存在于 bar 中的所有记录。

如果您的要求需要相反的,即排除 foo 中存在的 bar 中的记录,那么您的程序可以通过广播小数据集 foo 来提高效率,如下一个代码 sn-p 所示:

import org.apache.spark.sql.functions.broadcast

bar.join(broadcast(foo), bar("col1") === foo("col1"), "left_anti").show

祝你好运!

【讨论】:

    猜你喜欢
    • 2015-01-21
    • 2022-01-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多