【问题标题】:What Transformation should I apply on Spark DataFrame我应该在 Spark DataFrame 上应用什么转换
【发布时间】:2018-07-25 16:54:59
【问题描述】:

我有 2 个 Spark 数据帧(A 和 B),两者都有一个公共列/字段(这是 DataFrame A 中的主键,但不是 B 中的主键)。

对于数据框 A 中的每条记录/行,数据框 B 中有多条记录。 基于该公共列值,我想针对数据帧 A 中的每条记录从数据帧 B 中获取所有记录。

我应该执行什么样的转换才能将记录收集在一起而不做太多洗牌?

【问题讨论】:

  • 你应该做一个join
  • 你能提供示例输入数据吗?

标签: apache-spark apache-spark-sql


【解决方案1】:

要合并来自 2 个或更多 spark Dataframes 的记录,连接是必要的。

如果您的数据没有很好地分区/分桶,则会导致Shuffle join。其中每个节点都与每个其他节点通信,并且它们根据哪个节点具有某个密钥或一组密钥(您正在加入)共享数据。这些连接很昂贵,因为网络可能会被流量拥塞。

如果满足以下条件,则可以避免洗牌:

  • 两个数据帧都有一个已知的分区器或分桶。
  • 其中一个数据集小到可以放入内存,在这种情况下,我们可以进行广播哈希连接

分区

如果您在连接之前对数据进行了正确的分区,那么最终的执行效率会大大提高,因为即使计划了 shuffle,如果来自两个不同 DataFrame 的数据已经位于同一台机器上,Spark 也可以避免 shuffle .

df1.repartition(col("id"))
df2.repartition(col("id"))

// you can optionally specify the number of partitions like:        
df1.repartition(10, col("id"))

// Join Dataframes on id column    
df1.join(df2, "id") // this will avoid the duplicate id columns in output DF.

广播哈希加入

当其中一个数据集小到足以放入单个工作节点的内存时,我们可以优化我们的连接。

Spark 会将小型 DataFrame 复制到集群中的每个工作节点(无论它位于一台机器上还是多台机器上)。现在这听起来很昂贵。但是,这样做会阻止我们在整个加入过程中执行全对全通信。相反,它在开始时只执行一次,然后让每个单独的工作节点执行工作,而无需等待或与任何其他工作节点通信。

import org.apache.spark.sql.functions.broadcast
// explicitly specify the broadcast hint, though spark handles it.
df1.join(broadcast(df2), "id")

【讨论】:

    猜你喜欢
    • 2016-05-26
    • 1970-01-01
    • 1970-01-01
    • 2023-04-05
    • 2021-01-08
    • 1970-01-01
    • 1970-01-01
    • 2019-04-11
    • 1970-01-01
    相关资源
    最近更新 更多