【问题标题】:Intuition for setting appropriate parallelism of operators in Flink在 Flink 中设置适当的算子并行度的直觉
【发布时间】:2022-06-09 02:58:49
【问题描述】:

我的问题是关于在固定集群设置中的 flink 作业中为操作员了解并行性的良好选择。假设我们有一个 flink 作业 DAG,其中包含 mapreduce 类型的运算符,它们之间有流水线边(没有阻塞边)。 DAG 示例如下:

Scan -> Keyword Search -> Aggregation

假设一个由M 机器组成的固定大小的集群,每个具有C 核心,并且DAG 是在集群上运行的唯一工作流。 Flink 允许用户为各个算子设置并行度。我通常为每个运算符设置M*C 并行度。但从性能角度(例如执行时间)来看,这是最佳选择吗?我们能否利用运营商的特性做出更好的选择?例如,如果我们知道aggregation 更昂贵,我们是否应该只将M*C 并行度分配给aggregation 运算符并减少其他运算符的并行度?这也有望减少背压的机会。

我不是在寻找一个合适的公式来给我“最好的”并行性。我只是在寻找某种可以用来做出决定的直觉/指南/想法。令人惊讶的是,我找不到太多关于这个主题的文献可供阅读。

注意:我知道最近 Flink 中的动态缩放响应模式。但是我的问题是关于一个只运行一个工作流的固定集群,这意味着动态扩展是不相关的。我查看了this 的问题,但没有得到答案。

【问题讨论】:

    标签: apache-flink distributed-computing flink-streaming distributed-system


    【解决方案1】:

    我对此的看法略有不同。在我看来,有两个关键问题需要考虑:

    (1) 我想保持插槽统一吗?或者换句话说,每个插槽都会有每个任务的实例,还是我想调整特定任务的并行度?

    (2) 每个插槽有多少个核心?

    我对 (1) 的回答默认为“保持统一”。我还没有看到很多情况证明调整单个运算符(或任务)的并行性是值得的。

    如果更改并行度意味着破坏操作员链,则通常会适得其反。无论如何,在不寻常的情况下,在洗牌的地方做这件事是有意义的,但总的来说,我不明白这一点。既然一些槽将有每个操作符的实例,并且槽都是统一的,那么为什么分配给它们的任务更少的槽会有帮助呢? (在这里,我假设您对设置插槽共享组的麻烦不感兴趣,这当然可以做到。)从操作的角度来看,沿着这条路走下去会使事情变得更加复杂,而且收效甚微。在我看来,最好在其他地方进行优化(例如,序列化)。

    至于每个插槽的核心数,许多工作受益于每个插槽有 2 个核心,而对于一些具有大量任务的复杂工作,您可能希望做得更高。因此,我认为对于简单 ETL 作业的整体并行度为 M*C,而对于执行更密集的作业,M*C/2(或更低)。

    为了说明极端情况:

    一个简单的 ETL 作业可能类似于

    source -> map -> sink
    

    其中所有连接都是转发连接。由于只有一个任务,并且因为 Flink 每个任务只使用一个线程,所以在这种情况下,我们每个插槽只使用一个线程。因此,为每个插槽分配一个以上的内核是完全浪费的。无论如何,该任务可能是 i/o 绑定的。

    在另一个极端,我见过涉及约 30 个连接、一个或多个 ML 模型的评估以及窗口聚合等的作业。您当然希望有多个 CPU 内核来处理这样的作业的每个并行切片(并且超过两个,就此而言)。

    通常,大部分 CPU 工作都用于序列化和反序列化,尤其是 RocksDB。我会尝试弄清楚,对于每个事件,涉及多少 RocksDB 状态访问、keyBy 和重新平衡 - 并提供足够的内核以使所有这些 ser/de 可以同时发生(如果您关心最大化吞吐量)。对于最简单的工作,一个核心可以跟上。等到你得到像窗口连接这样的东西时,你可能已经在推动一个核心可以跟上的极限——这取决于你的源和接收器可以走多快,以及你对不浪费资源的谨慎程度。

    示例:假设您在 50 并行度(每个插槽 2 个内核)或 100 并行度(每个插槽 1 个内核)之间进行选择。在这两种情况下,都可以使用相同的资源——哪个会更好?

    一般而言,我希望每个插槽有更多内核的插槽更少,性能会更好一些,前提是每个插槽有足够的任务/线程来保持两个内核都很忙(如果整个管道适合一个任务,这可能不是真的,尽管反序列化器可以也在他们自己的线程中运行)。使用更少的插槽,每个插槽将拥有更多的键和键组,这将有助于避免数据倾斜,并且使用更少的任务,检查点(如果启用)会表现得更好。进程间通信也更有可能采用优化的(内存中)路径。

    【讨论】:

    • 谢谢@大卫。您对简单 ETL 作业与复杂作业的并行性的想法是一个值得考虑的有趣点。你能举一些例子吗?具体来说,根据您的经验,您能否举一个简单的工作示例,M*C 就足够了,以及我们可能必须设置并行度为M*C/2 的工作示例,为什么?这个推理可能对我有帮助。
    • @AvinashK我扩大了我的答案;希望有帮助。
    • FWIW,我通常会反过来考虑这个问题——换句话说,我需要为每个插槽提供多少个内核才能使所有并行管道在给定并行度下以最大吞吐量运行?与“我可以用这个硬件实际实现的最高并行度是多少?”相反,它是“我需要多少硬件来实现我想要的并行度?”。
    • 我试图理解你的陈述You certainly want more than one CPU core handling each parallel slice of a job like that。我有一个可能非常基本的问题。假设我没有与任何磁盘或 RocksDB 后端交互,并且容错功能已关闭。假设为一个复杂的管道提供 2 个内核会为该管道产生T 的吞吐量。但是,如果我只给一个管道提供 1 个核心但创建两个管道,则每个管道的吞吐量将为 T/2。总吞吐量仍为T。为什么在第一种情况下总吞吐量会更高?在第一种情况下,那个因素是什么?
    • 我添加了几段来解决这个问题。
    猜你喜欢
    • 1970-01-01
    • 2019-07-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-10-21
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多