【问题标题】:Kafka consumer design to process huge volume of data with multi instanceKafka 消费者设计以处理具有多实例的大量数据
【发布时间】:2021-06-22 15:08:33
【问题描述】:

我正在尝试设计 Kafka 消费者,但我在如何设计流程方面遇到了障碍。我正在考虑两种选择:

1.  Process records directly from Kafka.
2.  Staging table write from Kafka and process records.

方法 1: 随时随地处理来自 Kafka 的关键消息:

•   Read messages one at a time from Kafka & if no records to process break the loop (configurable messages to process)
•   Execute business rules.
•   Apply changes to consumer database.
•   Update Kafka offset to read after processing message.
•   Insert into staging table (used for PD guide later on)

上述方法的问题:

•   Is it OK to subscribe to a partition and keep the lock open on Kafka partition until configurable messages are processed
    and then apply business rules, apply changes to database. All happens in the same process, any performance issues doing this way ?
•   Is it OK to manually commit the offset to Kafka? (Performance issues with manual offset commit).

方法 2: 从 Kafka 写入暂存表并处理记录

Process 1: Consuming events from Kafka and put in staging table.
Process 2: Reading staging table (configurable rows), execute business rules, apply consumer database changes 
& update the status of processed records in staging table. (we may have multiple process to do this step)

我认为这种方法有很多缺点:

•   We are missing the advantage of offset handling provided by Kafka and we are doing manual update of processed records in staging table.
•   Locking & Blocking on staging tables for multi instance, as we are trying to insert & do updates after processing in the same staging table 
    (note: I can design separate tables and move this data there and process them but that could is introducing multiple processes again.

如何设计具有多实例消费者和要处理的大量数据的 Kafka,哪种设计是合适的,从 Kafka 中读取数据并处理消息或将其暂存到表中并编写另一个作业来处理是否好这些消息?

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    使用数据并相应更新表的第一种方法听起来是正确的方法。

    Kafka 保证

    至少一次:您可能会收到两次相同的消息。
    这意味着您需要消息是幂等的 -> 将数量设置为 x 而不是将数量添加到之前的值。

    顺序(每个分区):Kafka 承诺您使用消息的顺序与生成消息的顺序相同 - 每个分区。就像每个分区的队列一样。
    如果当你说“执行业务规则”时,你还需要读取之前的写入,那就意味着你需要一个一个地处理它们。

    如何定义分区

    如果您定义一个分区,您将不会遇到冲突问题,但您将只有一个消费者并且无法扩展。
    如果你随意定义多个分区,那么你可能会失去顺序。
    为什么会有这样的问题?
    您需要根据您的业务模型定义分区: 例如,假设每条消息都会更新某个用户的数据库。当您处理要读取用户行的消息时,检查一些字段,然后根据该字段更新(或不更新)。
    这意味着如果您通过用户 ID 定义分区 -> (用户 ID % 分区数)
    您保证不会在同一用户的两次更新之间出现竞争条件,并且您可以扩展到多台机器/进程/线程。每个消费者负责一组用户,但始终是相同的用户。

    【讨论】:

      【解决方案2】:

      消费者的设计取决于您的用例。 如果有其他下游进程需要相同的数据并且连接到您的 kafka 集群有限制。在这种情况下,有一个临时表是个好主意。

      我认为在您的情况下,方法 1 稍作改动是一个不错的方法。 但是,如果主题中没有新消息,则不需要中断循环。 此外,还有一个 consumer 属性可以帮助配置您希望在单个请求中从 kafka 轮询的记录数(默认为 500),如果每条消息需要很长时间来处理,您可能希望将其更改为较低的数字(To避免超时或不必要的重新分区问题)。

      由于您提到数据量很大,如果处理顺序对您来说不重要,我建议您使用更多的并发分区。并发可以通过创建一个实例数不超过主题分区数的消费者组来实现。 (如果消费者实例数大于分区数,则额外的实例将是理想的)

      如果顺序确实很重要,生产者应该理想地发送具有相同消息键的逻辑分组消息,以便具有相同键的所有消息都落在同一个分区中。

      关于偏移提交,如果你同步提交每条消息到 kafka,你肯定会对性能产生影响。通常为每批消费的记录提交偏移量。例如轮询 500 条记录-> 处理-> 提交这批记录。 但是,如果您需要为每条消息发送提交,您可能需要选择异步提交。

      此外,当分区分配给消费者组实例时,它不会锁定分区。其他消费组可以订阅同一个主题,并发消费消息。

      【讨论】:

        【解决方案3】:

        这就是我认为我们可以在不担心消息丢失的情况下获得最佳吞吐量的方式-

        1. 最大化分区数。
        2. 部署消费者(最大分区数,如果您的消费者可以毫无问题地运行多线程,则更少。)
        3. 从每个消费者中单线程读取(使用自动偏移提交)并将消息放入阻塞队列中,您可以根据每个消费者中的实际处理线程数来控制该队列。
        4. 如果处理失败,您可以重试成功或将消息放入死信队列。不要忘记执行关闭连接以处理已使用的消息。
        5. 如果您想确保处理事件的顺序是一个接一个地使用相同的键或在单个分区的任何其他因素上处理事件,您可以使用确定性执行器。我用 Java 编写了一个基本的 ExecutorService,它可以确定性地执行多条消息,而不会影响逻辑上独立事件的多线程。链接-https://github.com/mukulbansal93/deterministic-threading

        回答你的问题-

        1. Is it ok to subscribe to a partition and keep the lock open on Kafka partition until configurable messages are processed and then apply business rules, apply changes to database. All happens in the same process, any performance issues doing this way? 我在这里看不到太多性能问题,因为您正在批量处理。但是,您使用的其中一条消息可能需要很长时间,而其他消息则需要处理。在这种情况下,您将不会从 Kafka 读取其他消息,从而导致性能瓶颈。
        2. Is it ok to manually commit the offset to Kafka? (Performance issues with manual offset commit). 这绝对是吞吐量最低的方法,因为偏移提交是一项昂贵的操作。

        【讨论】:

        • 感谢 Mukul,我有 5 个单独的消费者实例,而不是在 BlockingQueue 中实现单个实例中的多个线程。所以,动态读取和处理事件(实时处理)比在表中暂存(冷数据处理)并使用单独的批处理来处理暂存表中的这些事件更好吗?另外,如果我有多个实例,我如何确保关键消息的顺序,我有分区键,所以所有具有相同键的消息都将进入同一个分区..
        • 1.运行多个消费者都很好。一个消费者将消费和处理来自 1 个或多个分区的消息。因此,如果您还想在一个消费者中处理多个事件,您应该继续使用阻塞队列方法。如果不需要在一个消费者实例中处理多个事件,则可以跳过。
        • 2.如果您将未处理的消息重新发布回未处理的 Kafka 队列甚至是同一个队列,则可以避免暂存表。然后,您的消费者可以稍后重新选择它们。大多数这样做的人在消息本身内部都有一个计数器,表示已尝试对事件进行处理的次数。我不反对使用 DB 表,但如果可以避免,我更喜欢它,因为它降低了总体成本,并从架构中增加了一个可管理的组件。
        • 3.你是对的,相同key的消息默认会到同一个partition,被同一个consumer instance拾取。
        猜你喜欢
        • 2015-08-06
        • 2017-11-17
        • 2020-09-19
        • 2016-06-06
        • 1970-01-01
        • 2021-04-24
        • 2017-11-19
        • 2020-07-20
        • 1970-01-01
        相关资源
        最近更新 更多