【问题标题】:Multithreaded Kafka Consumer or PerPartition-PerConsumer多线程 Kafka Consumer 或 PerPartition-PerConsumer
【发布时间】:2017-04-20 19:49:33
【问题描述】:

在实施 kafka 消费者时应该有什么更好的方法。

从 Kafka 读取目标并写回 db。数百万行

方法 1: 每个分区 - 每个消费者 - 等待消息消费(即写回数据库)然后在轮询循环中继续下一步。

方法 2: 每个分区 - 每个消费者 - 将记录发送到工作线程或线程池以写回数据库,然后提交偏移量并继续轮询。需要注意偏移管理。在此不要等待消息写回数据库。继续轮询,将消息传递给工作线程。

对他们有什么见解吗?

谢谢

【问题讨论】:

    标签: apache-kafka kafka-consumer-api


    【解决方案1】:

    方法一: 该方法仅适用于您可以估计消息处理时间的情况,否则不推荐。

    问题:在这种方法中,主要问题是让消费者保持活力,如果您在再次调用 poll() 之前等待消息完全处理,则必须确保您的消费者在调用 poll() 之前应该是活动的,因为 kafka 维护一个名为“session.timeout.ms”的属性。 kafka broker/cluster对该属性的值进行操作,如果consumer在“session.timeout.ms”时间内无法再次调用poll(),broker将标记consumer dead并被踢出.现在,当消费者完成消息处理并再次调用 poll() 时,它被认为是一个新的加入者,并将再次像以前一样给出从偏移量开始的记录集。记住这种情况,消费者将陷入无限循环,永远不会进行偏移。

    可能的解决方案 1: 要使用这种方法,您需要具有以下副作用的以下属性“session.timeout.ms”的良好值:

    1:值太低:消费者将如上所述被标记为死亡,并且永远不会继续其偏移量,但是会处理消息,但每次完成消息时,它都会再次获得以前的消息+新消息。

    2:值太高:Broker 将很晚才检测到消费者的真正故障,这将导致记录重复并影响整体吞吐量。

    可能的解决方案 2:(仅对 0.10.1.x 版本有效) Kafka 在版本 (0.10.1.0) 中正式修复。 在这种方法中,引入了两个值得注意的实体:一个新属性“max.poll.interval.ms”,它设置客户端调用 poll() 之间的最大延迟,以及一个负责保持消费者活动的后台线程。因此,在一个场景中,当消费者调用 poll() 方法然后忙于消息处理时,内部后台线程将保持心跳活着,结果消费者将保持活着。但是,这个内部后台线程本身将保持活动状态,直到属性“max.poll.interval.ms”的超时值保持有效。所以,这个线程会等待消费者在“max.poll.interval.ms”的时间段内调用poll(),如果没有,它会发送一个离开请求,也会自己死掉。”

    同样,这个解决方案的棘手部分是找到这个属性的合适值:“max.poll.interval.ms”(非常重要,这个时间将是后台线程在没有需要显式调用 poll())。

    方法 2: 使用工作线程是个好主意,但是您必须维护一个内部队列或验证接收到的消息,这可能很复杂,而且您还需要使用手动提交来对抗自动提交.有关提交的更多信息,请参阅this 并搜索标题“提交和偏移”。

    问题:在这种方法中,主要问题是跟踪收到的消息和成功处理的消息。因为,您的消费者将收到消息,它将将消息传递给相应的工作线程,并将提交偏移量并继续接收更多消息。在此过程中,您必须注意以下问题:

    1. 如果收到消息并提交偏移量,但后来由于某种原因工作线程未能处理消息,现在如何再次获取该消息?
    2. 如果消费者收到消息但没有空闲的工作线程来处理怎么办?

    解决方案: 可以有不同的方法来解决上述问题,一种方法是使用内部队列来保留消息和手动提交,只有当工作线程报告成功时才会发送消息的处理。然而,需要非常小心的实现,因为它可能导致复杂的代码,也可能导致内存管理或线程问题。

    建议:根据您的要求,您可以使用一种方法或另一种方法来解决上述可能出现的问题。但是我会推荐一个更强大的解决方案是使用分区暂停/恢复。您的消费者应该以非常抽象的方式执行以下步骤:

    1: poll() 获取消息。

    2:暂停所有各自的主题/分区。

    3:将消息分配给工作线程并等待其处理。

    4: 继续调用 poll() 但分区暂停时不会收到额外的消息,而消费者将保持活动状态。 (确保在此期间没有注册新主题)

    5:如果所有工作线程都应该报告消息处理成功/失败,则相应地提交偏移量。

    6:恢复所有分区。

    注意:根据您的场景和要求,可能会有更好的方法或其他解决方案。这只是一个想法或可能的解决方案之一。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-04-12
      • 2019-07-23
      • 2023-03-02
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-06-01
      • 2015-08-24
      相关资源
      最近更新 更多