【问题标题】:Apache Kafka consume events from different topics in specific orderApache Kafka 按特定顺序使用来自不同主题的事件
【发布时间】:2019-09-26 03:13:40
【问题描述】:

假设,我有 topicA、topicB 和 topicC,这两个主题都由不同的事件类型隔离,基于域实体。 topicA 仅对 eventA 进行操作,topicB 保留 eventB,topicC 仅对 eventC 进行操作。所有事件都按业务领域相互关联,但由单独的微服务产生,应按特定顺序处理。

问题是,如何使用Apache Kafka按特定顺序引入消费事件,eventA然后等待接收eventB,然后当eventC接收到时将它们全部消费。

感谢任何反馈,欢迎提出任何问题。

一些注意事项: Kafka Streams 是一种很好的方法,但受到公司政策的限制。

另外,我查看了Join Pattern,但没有找到任何可靠的实施方法。

【问题讨论】:

  • Martin Kleppmann 有一篇关于这个主题的精彩文章:martin.kleppmann.com/2018/01/18/event-types-in-kafka-topic.html
  • 我很好奇这些“公司政策”是什么让您可以使用生产者/消费者 API,但不能使用 Streams
  • @SeyedMortezaMousavi 感谢您的精彩文章。我对生产环境中的 kafka 和微服务没有丰富的经验,但是在生产环境中使用 kafka 的实践是什么,我的意思是每个业务实体有几十个主题是否可以,或者每个人都保留少量主题?跨度>
  • 查看confluent.io/blog/… 了解如何选择主题和分区数。

标签: concurrency java-8 apache-kafka spring-kafka


【解决方案1】:

可能有很多方法可以解决这个问题。这里有几个,我可以建议:

  • 引入关联 ID,将主题 A、B 和 C 的事件关联起来。然后,按以下方式使用关联 ID:

    1. 服务 A、B 和 C 对对应的主题产生事件,但相关事件具有相同的关联 ID

    2. 服务 D 使用来自不同主题的事件。每次从任何主题接收事件时,服务 D 要么通过相关 ID 将事件数据插入数据库,要么在收到所有数据时执行一些操作。

    例如,当服务 D 接收到事件 C 时,它首先发出查询以检查数据库中是否存在与事件 C 的相关 ID 的记录:

    • 如果没有记录,则存储传入事件C,
    • 如果某些记录已经存在,则服务 D 会检查事件 C 是否是消耗所有数据所需的最后一个事件,然后执行最终操作,或者将事件 C 插入数据库。

    对于每个消费事件,依此类推。

  • 产生事件的链服务(A、B 和 C)。例如,链可以通过以下方式形成:

    1. 服务 A 向主题 A 产生事件

    2. 服务 B 使用来自主题 A 的事件,并将事件生成到主题 B(可能是聚合事件 A 和 B)

    3. 服务 C 使用来自主题 B 的事件,并将事件生成到主题 C(可能是聚合事件 A、B 和 C)

    4. 最后,服务 D 使用来自主题 C(可能与 A、B 和 C 聚合)的事件并执行所需的操作。

    这种方法的变体(不在每个中间阶段聚合事件)将链接服务并监听链中的最后一个事件。消费完最后一个事件后,向对应的topic发出Kafka pull,获取其他服务产生的事件。

【讨论】:

  • 谢谢你为我工作。我有一个类似的解决方案来使用correlationID,但我们需要使用单个主题来处理此类事件以特定顺序
【解决方案2】:

如果事件彼此相关,那么它们应该转到一个主题。所以 microservice-1 应该使用 (key, value) & label (eventA) 推送 eventA。同样,microservice-2 和 microservice-3 应该将数据推送到一个共同的主题。

这将在消费者方面为您提供帮助。

【讨论】:

  • 有趣的想法。在这种情况下,“标签”是什么意思,messageId?还有一个问题,例如,如果我们有一个共同的主题,但有 4 个分区,以及具有不同 groupId 的 4 个不同的事件消费者......我们将有事件重复,不是吗?
  • 您可以使用 Header (kafka.apache.org/11/javadoc/org/apache/kafka/common/header/…) 来丰富消息。仅供参考,具有相似键的消息将进入同一个分区(类似于 java 中的 hashmap)。
  • 您需要在消费者端手动提交偏移量。您的消费者应该找到带有标签 A 的消息,跳过消息直到找到带有标签 B 的相关消息,跳过消息直到找到带有标签 C 的相关消息。这仅在相关事件具有相似键时才有效。例如,如果事件是“创建账单”、“发送通知”和“收到付款”,则 billId 可以是键。
  • 如果你使用的是 kafka,你必须限制你的主题从不使用多个分区,或者让生产者在发送到经纪人。否则,您的消息可能会被乱序接收和处理。这也将结合 microservice-1、microservice-2 和 microservice-3,在未来可能会限制这些服务中的每一个。
  • 谢谢。当服务由同一团队或同一项目中的团队开发时,这样的解决方案很好。但是当它依赖于不同的项目时,它需要大量的工作。感谢您的澄清
【解决方案3】:

由于您询问的是如何在不同主题之间对消息的消费进行排序,那么第一个选择是让一个消费者产生一条消息,为下一个消费者提供信息(这些消费者可能属于也可能不属于同一流程):

consumerA 处理消息 -> 消费者A 将新消息放在不同的主题上 -> 消费者B 拾取该新消息并处理 -> 消费者B 将新消息放在第二个主题上 -> 等等...等等。

如果流本质上是在做这个或类似的过程,我不会感到惊讶。可以使用任何其他类型的进程间通信接口:RDP、内存映射文件、互斥体、管道;任你选。

除非万不得已,否则我会尽量避免将不同的事件放在同一个主题上。当您将多个事件放在一个队列/主题上时,您可以通过以下几种方式限制您的消费者:

  1. 您的合约现在已针对这两个事件紧密耦合。要仅更改单个主题上的一个事件的形状,您的消费者必须根据元数据(幻数、键值等)动态反序列化这些事件。
  2. 您的消费模式可能效率较低。如果我只对其中一个事件感兴趣怎么办?如果不是我要找的,我必须阅读该事件,然后将其扔掉。

游乐园就是一个真实的例子。假设您有两种类型的游乐园游客:快速通行证和标准客户。您的业​​务规则规定快速通行证客户可以在标准客户之前跳过线路。

如果您将它们合并到一个队列/主题中,您是如何做到的?答案是优先排队;你问每个排队的人他们是否快速通过,这容易出错并且效率低下(这是优先排队;它可以工作,但它可能不是最好的解决方案)。大多数游乐园通过设置两个单独的队列来解决这个问题(每种类型的客户 [事件/消息] 一个队列)。现在他们可以将客户提供给两个单独的服务员(一个 FastPass 一个 Standard),或者他们可能让一个服务员同时处理两个队列,首先清空快速通过队列。

归根结底,这取决于您的限制:是每天 10 条消息还是 10 亿条消息,您需要立即一致性还是最终一致性,是在 IoT 设备上吗?

【讨论】:

  • Streams 确实是为有状态的操作这样做
  • Streams 确实适合这样的操作,但是在开源库中,Streams 有一些对系统至关重要的缺陷。所以你建议有一个单独的主题来保存聚合结果?
  • 如果您已经在使用 topicA、topicB 和 topicC,我认为添加 topicD 和 topicE 不会引入新的约束。是的,我们之前使用单独的主题作为聚合结果。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-03-20
  • 2020-05-08
  • 1970-01-01
  • 2017-05-29
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多