【问题标题】:Flink workflow parallelism with custom sourceFlink 工作流并行与自定义源
【发布时间】:2020-08-31 09:27:18
【问题描述】:

我在 Flink 中构建了一个工作流,它由一个自定义源、一系列地图/平面图和一个接收器组成。

我的自定义源的 run() 方法遍历存储在文件夹中的文件,并通过上下文的 collect() 方法收集每个文件的名称和内容(我有一个自定义对象来存储这个两个字段中的信息)。

然后我有一系列地图/平面图来转换这些对象,然后使用自定义接收器将这些对象打印到文件中。在 Flink 的 Web UI 中生成的执行图如下:

我有一个集群或 2 个工作人员设置为每个有 6 个插槽(它们也都有 6 个内核)。我将并行度设置为 12。从执行图中我看到源的并行度为 1,而工作流的其余部分的并行度为 12。

当我运行工作流程时(我在专用文件夹中有大约 15K 文件),我使用 htop 监控我的工作人员的资源。在大多数情况下,所有内核的利用率都达到 100%,但大约每 30 分钟左右,8-10 个内核就会空闲大约 2-3 分钟。

我的问题如下:

  1. 我了解源运行时具有并行度 1,我认为从本地存储读取时这是正常的(我的文件位于每个工作人员的同一目录中,因为我不知道将选择哪个工作人员来执行源)。真的正常吗?你能解释一下为什么会这样吗?

  2. 我的工作流程的其余部分以并行度 12 执行,这看起来是正确的,因为通过检查任务管理器的日志,我从所有插槽(例如,.... [Flat Map -> Map -> Map -> Sink: Unnamed (**3/12**)] INFO ........ [Flat Map -> Map -> Map -> Sink: Unnamed (**5/12**)] INFO .... 等)中获得了打印结果。我不明白的是,如果一个插槽正在执行源角色并且我的集群中有 12 个插槽,那么 12 个插槽如何执行其余的工作流?一个插槽是否同时代表工作流其余部分的源和一个实例?如果是,如何分配此特定插槽的资源?有人可以解释此工作流程中的步骤吗?例如(这可能是错误的):

  • 插槽 1 读取文件并将它们转发到可用插槽(2 到 12)
  • 插槽 1 将一个文件转发给它自己并停止读取,直到它完成它的工作
  • 完成后,插槽 1 读取更多文件并将它们转发到可用的插槽

我认为我上面描述的内容是错误的,但我将其作为示例来更好地解释我的问题

  1. 为什么我的大多数内核每 30 分钟(或多或少)就会出现这种空闲状态,持续大约 3 分钟?

【问题讨论】:

    标签: parallel-processing apache-flink


    【解决方案1】:

    要回答有关并行读取的具体问题,我将执行以下操作...

    1. 通过扩展 RichSourceFunction 来实现您的自定义源。
    2. 在您的open() 方法中,调用getRuntimeContext().getNumberOfParallelSubtasks() 获取总并行度,调用getRuntimeContext().getIndexOfThisSubtask() 获取正在初始化的子任务的索引。
    3. 在您的run() 方法中,当您遍历文件时,获取每个文件名的hashCode(),以总并行度为模。如果这等于您的子任务的索引,那么您处理它。

    通过这种方式,您可以将工作分散到 12 个子任务中,而无需让子任务尝试处理同一个文件。

    【讨论】:

      【解决方案2】:
      1. 单一消费者设置将管道的整体吞吐量限制为唯一一个消费者的性能。此外,它为所有插槽引入了重度洗牌 - 在这种情况下,消费者读取的所有数据也会在此消费者插槽上序列化,这是额外的 CPU 负载。相比之下,让消费者并行度等于地图/平面地图并行度将允许链接源 -> 地图操作并避免洗牌。
      2. 默认情况下,Flink 允许子任务共享槽,即使它们是不同任务的子任务,只要它们来自同一个作业。结果是一个槽可以容纳整个工作流水线。因此,在您的情况下,插槽 1 具有消费者和地图/平面地图任务,而其他插槽只有地图/平面地图任务。有关详细信息,请参见此处:https://ci.apache.org/projects/flink/flink-docs-release-1.10/concepts/runtime.html#task-slots-and-resources。此外,您实际上可以在 Web UI 上查看每个子任务的实例。
      3. 您是否启用了检查点?如果是并且如果是 30 分钟,那么这可能是状态被快照的时间间隔。

      【讨论】:

      • 感谢您的回复。关于 1)我猜这是使用访问本地存储的源的限制,对吧?不是说我可以通过设置一个参数来增加它的并行度吗?如果我理解正确,如果我使用 Kafka 存储对象(每个插槽订阅一个分区),我可以改进。但是使用本地存储,这是不可能的,对吧?关于 2) 这些子任务是否并行执行?如果我的消费者在特定时间点正在处理其中一个地图/平面图子任务,它会并行地将其余部分作为源服务还是在完成之前被阻止?
      • 1) 老实说,我没有使用本地存储,但假设您在每个工作人员上都有类似的文件,如果可能的话,您可以通过将数据分区到不同的子文件夹来并行化源。例如,或者像根据子任务索引为每个子任务分配它自己的工作和平。 2)是的,这些子任务是并行执行的,每个子任务都在自己的线程中。除了内存缓冲区之外,这些线程不会互相阻塞。
      • @MikalaiLushchytski 很好地回答了所有问题。我只是在 1) 中补充一点,您应该明确地以 12 的并行度运行它。根据使用的 InputFormat 和文件,Flink 能够将大文件拆分成更小的块(~32 MB),甚至可以能够并行处理单个大文件 12. 拥有多个文件使得并行化更加容易。
      • @ArvidHeise 感谢您的回复。我在运行时将并行度设置为 12。 Flink 将并行度 1 分配给源,将 12 分配给其余部分。通过检查 API,我发现:“默认情况下,源的并行度为 1。要启用并行执行,用户定义的源应实现 org.apache.flink.streaming.api.functions.source.ParallelSourceFunction 或扩展 org. apache.flink.streaming.api.functions.source.RichParallelSourceFunction。”如果我有多个本地存储的消费者,Flink 将如何判断哪些文件已经被其他人消费了?
      • Flink 添加了一个ContinuousFileMonitoringFunction,其并行度为 1,用于进行发现和拆分,并添加了一个ContinuousFileReaderOperator,其并行度仅用于获取拆分。通过检查点恢复期间也避免了双重处理(在重新启动时会记住拆分)。
      猜你喜欢
      • 2020-11-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-05-18
      • 1970-01-01
      • 2019-06-13
      • 2013-02-04
      • 2018-12-13
      相关资源
      最近更新 更多