【问题标题】:Apache Beam KafkaIO BatchMode OOM issueApache Beam KafkaIO BatchMode OOM 问题
【发布时间】:2019-12-11 06:53:25
【问题描述】:

我有一个用例,我想使用带有 Spark runner 的 Apache Beam 在批处理模式下从 Kafka 读取数据。

使用KafkaIO类的withMaxNumRecords(long)方法,可以从UnboundedReader生成BoundedReader。但我发现,在批处理模式下,首先从每个分区读取数据,放入内存,然后传递给下一个操作(映射、过滤器等)。

我在每个分区中都有大量数据,在以批处理模式读取这些数据时,我收到OOM 错误。我试图增加执行者内存。但是对于每次运行,我都无法为这个参数配置所需的值。 另一件事是,我可以在流模式下读取相同的数据。

我认为它正在发生,因为在批处理模式下,每个分区的所有记录都分配给GlobalWindow(ProcessContext 的一部分),这会触发仅读取所有数据。这可能是因为OOM 问题。

如果是这个原因,那我如何在 ProcessContext 中将GlobalWindow 更改为PartitioningWindow

如果这不是原因,那么我如何使用 Apache Beam 以批处理模式从 Kafka 读取这些庞大的数据,而不增加每次运行的执行器内存?

【问题讨论】:

  • 你能澄清一下你所说的 executor memory 是什么意思吗?你在使用 Spark 跑步者吗?
  • @JayadeepJayaraman :是的,我正在使用 spark runner。

标签: apache-spark apache-kafka batch-processing apache-beam


【解决方案1】:

来自文档

You can use windowing with fixed-size data sets in bounded PCollections. However, note that windowing considers only the implicit timestamps attached to each element of a PCollection, and data sources that create fixed data sets (such as TextIO) assign the same timestamp to every element. This means that all the elements are by default part of a single, global window.

To use windowing with fixed data sets, you can assign your own timestamps to each element. To assign timestamps to elements, use a ParDo transform with a DoFn that outputs each element with a new timestamp (for example, the WithTimestamps transform in the Beam SDK for Java).

这是一个如何为边界数据集定义窗口的示例 - https://beam.apache.org/get-started/wordcount-example/#unbounded-and-bounded-datasets

此外,从文档中一次读取所有有界数据集并分配一个全局窗口,如下所述

The bounded (or unbounded) nature of your PCollection affects how Beam processes your data. A bounded PCollection can be processed using a batch job, which might read the entire data set once, and perform processing in a job of finite length. An unbounded PCollection must be processed using a streaming job that runs continuously, as the entire collection can never be available for processing at any one time.

我相信要解决您的问题,您可以尝试为有界集合设置一个窗口并尝试在 Cloud Dataflow 上运行它,看看这是否有效。您还可以参考光束能力矩阵,从 Spark Runner 的角度查看支持的内容。

【讨论】:

  • 感谢 Jaydeep 的帮助。关于时间戳,如果你检查 KafkaIO 代码,每条记录都有自己的创建时间戳。因此,并非所有记录都附加相同的时间戳,因为所有记录都属于同一个窗口。当我调试代码时,我发现默认情况下 GlobalWindow 被分配给 ProcessContext 对象。 ProcessContext的outputWithTimestamp()方法用于输出记录,所以所有记录都属于GlobalWindow。
  • 同样在我的用例中,没有聚合操作,并且根据 Beam 文档,只有在管道中的某个地方存在 groupBy() 类型的操作时才会应用窗口操作。所以我不认为尽管我在有界 pcollection 上使用了 window(),但它不会被应用。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2023-03-11
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多