【发布时间】:2016-01-14 03:04:54
【问题描述】:
我正在寻找一种将结构发送到预先确定的分区的方法,以便其他 RDD 可以使用它们
假设我有两个键值对的 RDD
val a:RDD[(Int, Foo)]
val b:RDD[(Int, Foo)]
val aStructure = a.reduceByKey(//reduce into large data structure)
b.mapPartitions{
iter =>
val usefulItem = aStructure(samePartitionKey)
iter.map(//process iterator)
}
我该如何设置分区,以便为 mapPartition 提供我需要的特定数据结构,但我不会有发送所有值的额外开销(如果我要创建一个广播变量)。
我一直认为将对象存储在 HDFS 中,但我不确定这是否是次优解决方案。
我目前正在探索的另一个想法是,是否有某种方法可以创建自定义分区或分区器来保存数据结构(尽管这可能会变得太复杂并成为问题)
感谢您的帮助!
编辑:
Pangea 提出了一个很好的观点,我应该提供更多细节。本质上,我得到了 SparseVectors 的 RDD 和倒排索引的 RDD。倒排索引对象非常大。
我希望在向量的 RDD 中做一个 MapPartitions,我可以将每个向量与倒排索引进行比较。问题是我每个分区只需要一个倒排索引对象,并且进行连接会导致我拥有该索引的大量副本。
val vectors:RDD[(Int, SparseVector)]
val invertedIndexes:RDD[(Int, InvIndex)] = a.reduceByKey(generateInvertedIndex)
vectors:RDD.mapPartitions{
iter =>
val invIndex = invertedIndexes(samePartitionKey)
iter.map(invIndex.calculateSimilarity(_))
)
}
【问题讨论】:
-
给我们一个示例,说明您对 aStructure 等数据的期望。示例可以为您提供更多答案
标签: hadoop apache-spark hdfs