有没有办法手动设置RDD分区的preferredLocations?
是的,有,但它是特定于 RDD 的,因此不同类型的 RDD 有不同的方法。
Spark 使用 RDD.preferredLocations 获取首选位置列表以计算每个分区/拆分(例如 HDFS 文件的块位置)。
final def preferredLocations(split: Partition): Seq[String]
获取分区的首选位置,考虑 RDD 是否设置检查点。
如您所见,方法是 final,这意味着没有人可以覆盖它。
当您查看RDD.preferredLocations 的source code 时,您会看到RDD 如何知道其首选位置。它使用受保护的 RDD.getPreferredLocations 方法,自定义 RDD 可以(但不必)覆盖以指定放置首选项。
protected def getPreferredLocations(split: Partition): Seq[String] = Nil
所以,现在问题已经“演变”成另一个问题,即允许设置首选位置的 RDD 是什么。找到你的并查看源代码。
我正在使用一个数组和“Parallelize”方法从中创建一个 RDD。
如果您 parallelize 您的本地数据集,它不再是分布式的并且可以是分布式的,但是...为什么要使用 Spark 来处理可以在单个计算机/节点上本地处理的东西?
如果您坚持并且确实想将 Spark 用于本地数据集,SparkContext.parallelize 背后的 RDD 是...让我们看一下源代码...ParallelCollectionRDD 其中does allow for location preferences。
然后让我们将您的问题改写为以下内容(希望我不会丢失任何重要的事实):
允许创建ParallelCollectionRDD 并明确指定位置首选项的运算符有哪些?
令我惊讶的是(因为我不知道该功能),有这样一个运算符,即SparkContext.makeRDD,它...接受每个对象的一个或多个位置首选项(Spark 节点的主机名)。
makeRDD[T](seq: Seq[(T, Seq[String])]): RDD[T] 分发一个本地 Scala 集合以形成一个 RDD,具有一个或多个位置偏好(Spark 节点的主机名)为每个对象。为每个集合项创建一个新分区。
换句话说,你必须使用makeRDD,而不是使用parallelise(它在Spark Core API for Scala中可用,但我不确定我要留给你的Python作为家庭练习:) )
我将同样的推理应用于创建某种 RDD 的任何其他 RDD 运算符/转换。