【问题标题】:How to process events which are out of order using Kafka Streams如何使用 Kafka Streams 处理无序的事件
【发布时间】:2019-09-03 08:25:15
【问题描述】:

我有一个应用程序,其中基于用户登录、用户的中间操作(可选)和用户注销等用户操作在 Kafka 主题上发送事件。每个事件在事件对象中都有一些信息以及 userId ,例如登录事件有 loginTime;添加注释有注释(中间操作)。类似地,注销事件具有 logoutTime。要求是在收到每个用户的注销事件后将所有这些事件的信息聚合到一个对象中并将其发送到下游。

由于某些原因(网络延迟,多个事件生产者)事件可能没有按顺序出现(用户注销事件可能先于中间事件),那么问题是如何处理这种情况?收到用户注销事件后,我不能等待中间事件,因为中间事件是可选的,具体取决于用户的操作。

我认为这里唯一的选择是在接收到用户注销事件后等待一段时间,如果在该等待时间内接收到中间事件并发送已处理的事件,则处理中间事件,但再次不确定如何实现这一点。

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    正如在其他答案中所说,在 Kafka 中,订单是按分区维护的。

    既然您在谈论用户事件,为什么不将 UserID 作为您的 Kafka 主题键?因此,与特定用户相关的所有事件将始终被排序(假设它们由单个生产者生成)。

    您应该确保(根据设计)只有一个 Kafka 生产者将所有用户更改事件推送到给定主题。通过这种方式,您可以避免由于多个生产者而导致的乱序消息。

    从流中,您可能还想查看 Kafka 流中的 Windows。 Tumbling windows 例如是非重叠和固定大小。您在一段时间内汇总记录。

    现在您可能希望按时间戳(或者您说您有注销时间、登录时间等)对聚合进行排序并采取相应措施。


    简单有效的解决方案

    使用同步发送并将delivery.timeout.msretries设置为最大值。 为确保容错设置acks=allmin.insync.replicas=2(主题配置)并使用单个生产者推送到该主题。 您还应该将 max.block.ms 设置为某个最大值,以便在获取元数据时出现错误(例如,当 Kafka 关闭时),您的 send() 不会立即返回。

    用您的速率对同步发送进行基准测试,并检查它是否符合您的要求或基准数。

    这样可以确保先发送的消息首先发送到 Kafka,然后在成功确认前一个消息之前不会发送下一个消息。

    如果未达到您的基准值,请尝试背压 内存/持久队列等机制。

    1. 将事件添加到 Thread-1 中的队列
    2. 从 Thread-2 的队列中窥视(非出列)事件
    3. 在 Thread-2 中调用 producer.send(...).get()
    4. 将 Thread-2 中的事件出列

    【讨论】:

      【解决方案2】:

      Kafka 不保证topic 上的订单,它保证partition 上的订单。一个主题可以有多个分区,因此每个消费主题的消费者都会消费一个分区。这就是 kafka 实现可扩展性的方式。所以你遇到的是正常行为(它不是错误或与网络延迟或类似的东西有关)。您可以做的是确保您要按顺序处理的所有消息都发送到同一个分区。您可以通过将分区数设置为 1 来做到这一点,这是最愚蠢的方式。当您向生产者发送消息时,默认情况下,kafka 会查看密钥,对其进行哈希处理,并通过该哈希知道应该在哪个分区上发送消息。您可以确保所有消息的密钥都是相同的。这样一来,所有键的哈希值都是相同的,所有消息都将进入同一个分区。此外,您可以实现自定义分区器并覆盖 kafka 如何选择分区消息的默认方式。这样,所有消息都会按顺序到达。如果您无法执行任何此类操作,那么您将收到乱序的事件,您将不得不考虑一种如何乱序消费它们的方法,但这与 kafka 无关。

      【讨论】:

      • but that is not question related to kafka -- 为什么不呢?这绝对是相关的!
      【解决方案3】:

      如果您无法保留事件顺序(注销将是最后一个事件), 您可以使用来自 Kafka Streams 的 ProcesorApi 来满足您的要求。 Kafka Streams DSL 可以与处理器 API 结合使用(更多详情here)。

      您可以有多个分区,但特定用户的所有事件都必须发送到同一个分区。

      您必须实现自定义处理器/变压器。 您的处理器将把每个事件/活动放在状态存储中(聚合来自特定用户的所有事件在同一个键下)。 处理器 API 使您能够创建某种 调度程序 (Punctuator)。 您可以安排每 X 秒检查特定用户的事件。如果注销很久以前,您将获得所有事件/活动并进行一些聚合并将结果发送到下游。

      【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2017-01-07
      • 2022-08-19
      • 1970-01-01
      • 1970-01-01
      • 2018-08-25
      • 1970-01-01
      • 2017-07-28
      • 1970-01-01
      相关资源
      最近更新 更多