【问题标题】:How to achieve dynamic load-balancing of tasks in Apache Spark如何在 Apache Spark 中实现任务的动态负载均衡
【发布时间】:2018-06-16 19:07:47
【问题描述】:

我知道在 Spark 中我可以使用多个分区来拆分我的计算。如果说我可以将输入的 RDD 拆分为 1000 个分区并且我的机器数量为 100,Spark 会将计算拆分为 1000 个任务,并以某种智能方式将它们动态分配到我的 100 台机器中。

现在假设我最初只能将数据分成 2 个分区,但我仍然有 100 台机器。自然而然,我的 98 机器就闲置了。但是当我处理每个任务时,我可能会将其拆分为可能在不同机器上执行的子任务。它可以通过 queue 在纯 Java 中轻松实现,但我不确定在 Apache Spark 中攻击它的最佳方法是什么。

考虑以下 Java 伪代码:

BlockingQueue<Task> q = new LinkedBlockingQueue<Task>();
q.push(myInitialTask);
...
//On each thread:
while (!queue.isEmpty()) {
    Task nextTask = queue.take();
    List<Task> newTasks = process_task_and_split_to_sub_tasks(nextTask);
    queue.pushAll(newTasks);
} 

假设方法 'process_task_and_split_to_sub_tasks()' 可以将任何大型任务拆分为多个较小的任务,上述 Java 代码将使我所有的 100 个线程保持忙碌。

有没有办法在 Spark 中实现相同的功能,可以与其他工具结合使用吗?


更新:已经正确指出,攻击它的方法之一就是

  1. 生成更细粒度的密钥和
  2. 然后使用智能分区器将这些键分配给分区。

我想这是解决这个问题的“经典”方法,但它需要我能够正确估计每个键的工作量以正确分区它。如果我没有提前了解每个键的工作量的好方法怎么办?当我的大多数机器将保持空闲等待一些不幸的机器时,我可能会以非常不幸的分区告终。

示例:我们以一个简化的频繁项集挖掘为例。
假设我的文件包含字母从 a 到 j 的行(10 个字母),每行中的所有字母都按字母顺序排序并且没有重复,例如'abcf' 任务是找出所有行的 50% 中存在的所有字母组合。例如。如果很多行与模式 'ab.*f' 匹配,则输出将包含 {'a', 'b', 'f', 'ab', 'af', 'bf', 'abf'}。
实现它的一种方法是将所有以“a”开头的行发送到一个映射器(机器),所有以“b”开头的行发送到另一个等等。顺便说一句,这就是frequent pattern mining is implemented in Spark 的方式。现在假设我有 100 台机器(但只有 10 个字母)。然后我的 90 台机器将保持空闲状态。
使用更细粒度的密钥解决方案,我可以生成 10,000 个 4 字母前缀,然后根据每个前缀的估计工作量以某种方式对它们进行分区。但是我的分区可能是非常错误的:如果大多数行以'abcd'开头,那么所有工作都将由负责这个前缀(可能还有其他前缀)的机器完成,再次产生一个当我的大多数机器闲置等待一些不幸的机器时的情况。

在这种情况下,动态负载平衡将是这样的:收到以 'a' 开头的行的映射器可能希望进一步拆分其行 - 以 'ab 开头的行', 'ac', 'ad',... 然后将它们发送到其他 10 台机器,这些机器可能决定进一步将它们的工作拆分为更多任务。
我知道标准的 Apache Spark 没有开箱即用的答案,但我想知道是否有办法实现这一点。

Kafka(即上面的队列)+Spark Streaming 看起来很有前途,你认为我可以通过相对简单的方式使用这些工具来实现动态负载平衡吗?您能推荐其他工具吗?

【问题讨论】:

  • 如果你有 100 台机器和 10 个字母并且可以接受 2 字母前缀处理,那么你将从头开始按 2 字母前缀进行分区。而map only task可以实现完美的数据分布。我们担心的是by 操作,在所描述的场景中这不会成为问题。一旦你计算出分区基数的模式,在非退化的情况下应该是 >> 10。对于其他问题,我们已经建立了模式(例如,使用多步聚合进行加盐)。对我来说,听起来你想要 Akka 风格的微观管理,而不是 Spark。
  • 我同意,更细粒度的分区肯定是一种方法。但并非总是如此。例如,在某些实际的大数据频繁项集示例中,即使按 3 字母前缀进行分区也过于粗糙,因为只有少数最常见的 3 字母前缀可能会生成大部分答案。另一方面,拥有 100,000 个分区(按 5 个字母前缀分割)确实会降低性能。
  • 有 100,000 个分区(按 5 个字母前缀分割)会降低性能 - 你似乎有 wrong idea how partitioner works。键数和分区数是独立的(最佳分区数取决于唯一键数等因素,但没有固定关系)。
  • 好吧,Spark 文档建议每个 CPU 核心有 2-3 个分区,因为每个分区意味着一个具有相关开销的单独任务,请参阅例如Spark documentationStack Overflow discussion 关于这个主题。在使用 Spark 完成类似任务时,我自己也看到过。
  • 重点是?这是管理硬件的经验法则。它与分区数和唯一键数之间的关系无关。

标签: apache-spark spark-streaming load-balancing job-scheduling


【解决方案1】:

现在假设我有 100 台机器(但只有 10 个字母)。收到以“a”开头的行的映射器可能希望进一步拆分其行 - 以“ab”、“ac”、“ad”等开头的行,然后将它们发送到其他 10 台机器。

这不是 Spark 的工作方式。 “映射器”(任务)大多不了解所有分布式上下文。在这个级别,无法访问SparkContext,我们不再拥有RDDs,只需输入本地Iterator 和要在其上执行的代码。它无法启动,也无法创建新任务。

同时,您的问题定义是人为的。要找到频繁的模式,您必须聚合数据,因此您需要 shuffle。在这些点上,对应于给定模式的记录必须被洗牌到同一台机器上。确保数据正确分布是Partitioner 的工作,这里真的没有“拆分”的地方。

【讨论】:

  • 为什么我的问题定义是人为的?我认为问题很明确:99 台机器可能会等待一个映射器在某个单一任务上努力工作。我同意没有直接的解决方案,但这并不意味着解决方案不存在。例如,其中一种解决方案是映射器将添加“ab”、“ac”等前缀作为任务到 Spark Streaming 的输入。
【解决方案2】:

Spark 自己的动态分配可以在一定程度上模拟您想要的,但是如果您需要具有低级控制的详细、高性能方法,那么 Spark 不适合您。对于初学者,您将无法动态拆分任务 - 您只能调整分配给应用程序的整体资源。

您应该考虑低级调度程序并从头开始实施您自己的解决方案。

【讨论】:

  • 感谢您的回答。您能否添加指向文档或其他参考的链接,以便我们跟进you won't be able to dynamically split tasks - you can only adjust overall resources assigned to the application
  • Spark Streaming 也可以做到这一点。我的意思是即使 Spark 不允许以某种直接的方式来实现它,但仍然可能有一些不太明显的方式来实现它。
【解决方案3】:

要归档您的要求,您只需将数据从两个分区重新分区到您想要的任意数量的分区。

https://spark.apache.org/docs/2.2.0/api/java/org/apache/spark/api/java/JavaPairRDD.html#repartition-int-

顺便说一句,火花流与您的问题无关。

请注意,并行度不仅取决于数据集的分区,还取决于我们的工作/算法。

【讨论】:

  • Spark Streaming 可能与以下方式相关:我当前的任务处理器将识别新任务,然后将它们作为全新的任务输入提供给 Spark Streaming(或者,例如,它将从中提供给 Kafka转到 Spark Streaming)。
猜你喜欢
  • 1970-01-01
  • 2017-09-05
  • 2017-06-12
  • 1970-01-01
  • 1970-01-01
  • 2015-10-19
  • 1970-01-01
  • 2021-07-26
  • 2016-09-30
相关资源
最近更新 更多