【发布时间】:2015-05-05 16:59:54
【问题描述】:
尝试制作由两个地图组成的 Spark 广播 BiMap。由于映射在任一方向上都是唯一的,所有应该序列化的只是一个 Map,实际上只有 a Seq[(K, V)] 需要序列化。所以只有底层转发 Map 的元素。在反序列化中,我们可以重新创建逆 Map 和索引。
这是建议的设计:
class BiMap[K, V] (
private val m: Map[K, V],
// if this is serialized we allow i to be discarded and recalculated when deserialized
@transient private var i: Option[BiMap[V, K]] = None
) extends Serializable {
// NOTE: make inverse's inverse point back to current BiMap
// if this is serialized we allow inverse to be discarded and recalculated
// when first invoked from "val size_" in the constructor
@transient lazy val inverse: BiMap[V, K] = {
if( i == null.asInstanceOf[Option[BiMap[V, K]]] )
i = None
i.getOrElse {
val rev = m.map(_.swap)
require((rev.size == m.size), "Failed to create reversed map. Cannot have duplicated values.")
new BiMap(rev, Some(this))
}
}
// forces inverse to be calculated in the constructor when deserialized
// not when first used
@transient val size_ = inverse.size
...
}
虽然这似乎可行,但我不明白为什么我必须检查 i 是否为空,但反序列化后它为空。最初它是一个默认初始化 = None 的 val。
只有 m 应该被序列化,所以逆是 @transient lazy 并且还有另一个 @transient val size_ = inverse.size 意味着在反序列化时(而不是在任务调用 inverse 时)计算逆。最后一点是为了确保逆是共享的,而不是由每个任务重新创建。
虽然这似乎可行,但有点难看,我仍然不确定一些事情:
- 实例的所有存储都分配在广播变量中,而不是在任务堆空间中?
- 为什么
i需要是一个 var 并在它不应该为空时检查是否为空? - 最重要的是,这是否会导致反序列在广播时被丢弃并在反序列化中重新创建?
我知道我需要向 Kryo 注册它并最终实现 KryoSerializable 以精细控制序列化。
【问题讨论】:
标签: scala serialization apache-spark kryo