【问题标题】:Sending Items to specific partitions将项目发送到特定分区
【发布时间】:2016-01-14 03:04:54
【问题描述】:

我正在寻找一种将结构发送到预先确定的分区的方法,以便其他 RDD 可以使用它们

假设我有两个键值对的 RDD

val a:RDD[(Int, Foo)]
val b:RDD[(Int, Foo)]

val aStructure = a.reduceByKey(//reduce into large data structure)
b.mapPartitions{
    iter =>
         val usefulItem = aStructure(samePartitionKey)
         iter.map(//process iterator) 
}

我该如何设置分区,以便为 mapPartition 提供我需要的特定数据结构,但我不会有发送所有值的额外开销(如果我要创建一个广播变量)。

我一直认为将对象存储在 HDFS 中,但我不确定这是否是次优解决方案。

我目前正在探索的另一个想法是,是否有某种方法可以创建自定义分区或分区器来保存数据结构(尽管这可能会变得太复杂并成为问题)

感谢您的帮助!

编辑:

Pangea 提出了一个很好的观点,我应该提供更多细节。本质上,我得到了 SparseVectors 的 RDD 和倒排索引的 RDD。倒排索引对象非常大。

我希望在向量的 RDD 中做一个 MapPartitions,我可以将每个向量与倒排索引进行比较。问题是我每个分区只需要一个倒排索引对象,并且进行连接会导致我拥有该索引的大量副本。

val vectors:RDD[(Int, SparseVector)]

val invertedIndexes:RDD[(Int, InvIndex)] = a.reduceByKey(generateInvertedIndex)
vectors:RDD.mapPartitions{
    iter =>
         val invIndex = invertedIndexes(samePartitionKey)
         iter.map(invIndex.calculateSimilarity(_))
         ) 
}

【问题讨论】:

  • 给我们一个示例,说明您对 aStructure 等数据的期望。示例可以为您提供更多答案

标签: hadoop apache-spark hdfs


【解决方案1】:

Partitioner 是一个函数,给定一个通用元素,它将返回它属于哪个分区。它还决定分区的数量。 reduceByKey 有一种形式,它将分区器作为参数。 如果我正确理解您的问题,您希望在进行缩减时对数据进行分区。 看例子:

// create example data
val a =sc.parallelize(List( (1,1),(1,2), (2,3),(2,4) ) )
// create simple sample partitioner - 2 partitions, one for odd
// one for even key.hashCode. You should put your partitioning logic here
val p = new Partitioner { def numPartitions: Int = 2; def getPartition(key:Any) = key.hashCode % 2 }
// your reduceByKey function. Sample: just add
val f = (a:Int,b:Int) => a + b
val rdd = a.reduceByKey(p, f)
// here your rdd will be partitioned the way you want with the number
// of partitions you want
rdd.partitions.size
res8: Int = 2

rdd.map() .. // go on with your processing

【讨论】:

  • 嗨@RobertoCongiu,所以这有点接近我想要的。问题是,一旦我将减少的值放入正确的分区中。我希望以后能够在不同的 RDD 上执行 mapPartitions 时访问这些本地化值。您对我如何做到这一点有任何想法吗?
  • 你需要对其他 RDD 做什么?如果您需要加入两个RDDs,则不需要对其执行mapPartitions,因为join 足够聪明,可以检测到具有相同分区器的RDD。
  • 基本上我得到了稀疏向量的 RDD 和倒排索引的 RDD。倒排索引对象非常大。我希望在向量的 RDD 中做一个 MapPartitions,我可以将每个向量与倒排索引进行比较。问题是我每个分区只需要一个倒排索引对象,并且进行连接会导致我拥有该索引的大量副本。
  • 所以每个分区都有一个大的倒排索引,每个执行器都需要加载每个倒排索引。倒排索引有多大?
  • 这取决于用户制作的桶有多大。桶是总向量空间的预定义百分比(因此,如果有 1000 个向量并且用户决定他们想要 10 个桶,每个桶将保存 100 个值)。当然,它需要可存储在单个对象中,但仍然可以很大。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2018-10-23
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多