【问题标题】:Usage of Broadcast Variables when using only Spark-SQL API仅使用 Spark-SQL API 时广播变量的使用
【发布时间】:2020-11-08 15:46:16
【问题描述】:

在使用 Spark-RDD API 时,我们可以使用广播变量来优化 spark 分配不可变状态的方式。

1) 广播变量如何在内部工作?

我的假设是: 对于每个用于对数据集执行操作的闭包,它所引用的所有变量都必须被序列化、通过网络传输并与任务一起恢复,以便可以执行闭包。

像这样注册广播变量时​​:

val broadcastVar = sc.broadcast("hello world")

返回的对象(Broadcast[String])不保留对实际对象(“hello world”)的引用,而只保留一些 ID。 当一个广播变量句柄从上面所说的闭包中被引用时,它将像所有其他变量一样被序列化 - 只是广播变量句柄本身不包含实际对象。

稍后在目标节点上执行闭包时,实际对象(“hello world”)已经传输到每个节点。当闭包到达调用 broadcastVar.value 的位置时,广播变量句柄会在内部使用 ID 检索实际对象。

这个假设正确吗?

2) 有没有办法在 Spark-SQL 中利用这种机制?

假设我有一组允许的值。

当使用 RDD-API 时,我会为我的 allowedValues 创建一个广播变量:

val broadcastAllowedValues = sc.broadcast(allowedValues) // Broadcast[Set[String]]

rdd.filter(row => broadcastAllowedValues.value.contains(row("mycol")))

当然,在使用 Spark-SQL-API 时,我会使用 Column.isin / Column.isInCollection 方法:

dataframe.where(col("mycol").isInCollection(allowedValues))

但我似乎无法通过这种方式获得广播变量的优势。

另外,如果我将这段代码更改为以下内容:

val broadcastAllowedValues = sc.broadcast(allowedValues) // Broadcast[Set[String]]

dataframe.where(col("mycol").isInCollection(allowedValues.value))

这部分:

col("mycol").isInCollection(allowedValues.value)
// and more important this part:
allowedValues.value

将已在驱动程序上进行评估,从而产生一个新的Column-Object。所以广播变量在这里失去了它的优势。与第一个示例相比,它甚至会产生一些开销...

有没有办法使用 Spark-SQL-API 来利用广播变量,或者我必须在这些点上显式使用 RDD-API?

【问题讨论】:

    标签: scala apache-spark apache-spark-sql


    【解决方案1】:

    广播变量如何在内部工作?

    广播的数据被序列化并物理移动到所有执行器。根据Broadcast Variables 上的文档,它说

    “广播变量允许程序员在每台机器上缓存一个只读变量,而不是随任务一起发送它的副本。”

    有没有办法在 Spark-SQL 中利用这种机制?

    是的,有一种方法可以利用。 Spark 在加入大小数据帧时默认应用 Broadcast Hash Join

    根据“Learning Spark - 2nd edition”一书,它说:

    "默认情况下,如果较小的数据集小于 10MB,Spark 将使用广播连接。此配置在spark.sql.autoBroadcastJoinThreshold 中设置;您可以根据每个执行器上的内存量和在司机。”

    在您的情况下,您需要将所有唯一的 allowedValues 列出到一个只有一列(称为 allowValues 的列)的简单数据框(称为 allowedeValuesDF 的数据框)并应用连接来过滤您的 @ 987654328@.

    类似这样的:

    import org.apache.spark.sql.functions.broadcast
    val result = dataframe.join(broadcast(allowedValuesDF), "mycol === allowedValues")
    

    实际上,您可以省略 broadcast,因为 Spark 默认会进行广播加入。

    编辑:

    在更高版本的 Spark 中,您还可以在 SQL 语法中使用 join hints 来告诉执行引擎使用哪些策略。 SQL Documentation 中提供了详细信息,下面提供了一个示例:

    -- Join Hints for broadcast join 
    SELECT /*+ BROADCAST(t1) */ * FROM t1 INNER JOIN t2 ON t1.key = t2.key;
    

    【讨论】:

    • 感谢您的回答。我根据您的回答测试了我的过滤器的 3 个变体之间的时间差(请参阅:pastebin.com/rwgkTDPC)实际上,第三种方法(runWithBroadcastVariable)是我设置中最快的。加入只是有点慢。但我无法理解的是,为什么第一个(runWithInCollection 实际上比其他两种变体慢约 10 倍。理论上,它不应该几乎等同于runWithBroadcastVariable 吗?我知道这不是基准配置,但时间差异很大
    • 你是对的,这看起来不像一个合适的基准。我确信在某些情况下,简单的广播作为 hashed 连接会更快,特别是对于 simpöe number rangers,其中哈希不会有任何改进,只会增加一些成本。我也不熟悉spark.run 测量以及它实际考虑的内容。也许查看数据框的explain 图表会更加清楚。
    • 这张explain 图表很可能表明runWithInCollection 需要大量数据跨分区混洗,这非常昂贵。
    • 实际上,runWithInCollection 根本不需要任何洗牌。但它通常会创建某个类的动态实现,如果集合太大(10000 个 Ints 似乎已经太大而无法编译),这将不起作用。例如,当仅使用 500 个 allowedValues 时,runWithInCollection 实际上是所有 3 中最快的请参阅:issues.apache.org/jira/browse/SPARK-33383
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2016-08-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多