【发布时间】:2020-11-08 15:46:16
【问题描述】:
在使用 Spark-RDD API 时,我们可以使用广播变量来优化 spark 分配不可变状态的方式。
1) 广播变量如何在内部工作?
我的假设是: 对于每个用于对数据集执行操作的闭包,它所引用的所有变量都必须被序列化、通过网络传输并与任务一起恢复,以便可以执行闭包。
像这样注册广播变量时:
val broadcastVar = sc.broadcast("hello world")
返回的对象(Broadcast[String])不保留对实际对象(“hello world”)的引用,而只保留一些 ID。
当一个广播变量句柄从上面所说的闭包中被引用时,它将像所有其他变量一样被序列化 - 只是广播变量句柄本身不包含实际对象。
稍后在目标节点上执行闭包时,实际对象(“hello world”)已经传输到每个节点。当闭包到达调用 broadcastVar.value 的位置时,广播变量句柄会在内部使用 ID 检索实际对象。
这个假设正确吗?
2) 有没有办法在 Spark-SQL 中利用这种机制?
假设我有一组允许的值。
当使用 RDD-API 时,我会为我的 allowedValues 创建一个广播变量:
val broadcastAllowedValues = sc.broadcast(allowedValues) // Broadcast[Set[String]]
rdd.filter(row => broadcastAllowedValues.value.contains(row("mycol")))
当然,在使用 Spark-SQL-API 时,我会使用 Column.isin / Column.isInCollection 方法:
dataframe.where(col("mycol").isInCollection(allowedValues))
但我似乎无法通过这种方式获得广播变量的优势。
另外,如果我将这段代码更改为以下内容:
val broadcastAllowedValues = sc.broadcast(allowedValues) // Broadcast[Set[String]]
dataframe.where(col("mycol").isInCollection(allowedValues.value))
这部分:
col("mycol").isInCollection(allowedValues.value)
// and more important this part:
allowedValues.value
将已在驱动程序上进行评估,从而产生一个新的Column-Object。所以广播变量在这里失去了它的优势。与第一个示例相比,它甚至会产生一些开销...
有没有办法使用 Spark-SQL-API 来利用广播变量,或者我必须在这些点上显式使用 RDD-API?
【问题讨论】:
标签: scala apache-spark apache-spark-sql