【问题标题】:kafka connect transformations ordering guaranteeskafka 连接转换订购保证
【发布时间】:2020-06-03 03:40:17
【问题描述】:

我们计划使用 JMS 源连接器将数据流式传输到我们的 Kafka 集群。 ActiveMQ 的数据是 XML 格式的。 JMS Source Connector 使用内部 messageID (Message.getJMSMessageID()) 作为键。

作为键的字段 - 在连接器流向的 Kafka 主题上 - 需要从 (XML) 有效负载中检索。

要实现这一点,在连接器中需要几个步骤。

  • 要将 XML 转换为内部 Kafka Connect 结构,我们使用自定义转换插件 (https://github.com/jcustenborder/kafka-connect-transform-xml)
  • 然后 ValueToKey 和 ExtractField 转换器设置作为有效负载一部分的密钥。
  • 现在此键值对已准备好发送到我们的 Kafka 主题。

我们正在处理金融交易,需要保证消息的顺序。 我们有很高的吞吐量,据我了解,配置 tasks.max 允许通过在 Kafka Connect Worker 之间分配任务来实现并行性。

第一个问题:并行性如何与单个消息转换器结合使用? '(Source)Connector - Transformer - Converter' 是否形成一个通过设置 tasks.max 一起分布的管道,或者 tasks.max 设置是否仅适用于连接器?

后者似乎有点奇怪,所以假设前者是正确的,我还有一个疑问。

我们的 Kafka 主题键 - 将保证 Kafka 主题的顺序 - 在连接器的任务中确定。选择 tasks.max > 1 传入的消息分布在正在运行的任务中。

分布在多个任务之间,两条(或更多)消息(在有效负载中包含相同的密钥)以特定顺序从 ActiveMQ 到达,并且可以发送到不同的 Kafka Connect 任务。

理论上,当最终流式传输到 Kafka 主题时,顺序可以颠倒(在同一个分区上,因为它们现在具有相同的键)。

我这样推理是否正确,是否有办法规避这种情况?或者仅在此用例中仅使用一项任务才有可能获得订购保证。

【问题讨论】:

    标签: apache-kafka apache-kafka-connect


    【解决方案1】:

    tasks.max 设置是否仅适用于连接器?

    这个

    我们的 Kafka 主题密钥 - 这将保证 Kafka 主题的顺序

    不,它没有。它只保证分区,仅此而已

    是仅在此用例中仅使用一项任务才有可能的订购保证。

    这取决于来源。我不知道AMQ,但是如果读取消息将其从队列中删除,那么多个任务就不可能得到消息

    【讨论】:

    • 也许我之前并不清楚。基于有效负载中的字段(例如,某些字段的值为“A”)的 ActiveMQ 消息的相对顺序需要在 Kafka 主题上维护。相同的键意味着主题上的相同分区。当 Kafka Connect 并行运行 (tasks.max>1) 并且进入不同 Kafka Connect Worker 线程的 ActiveMQ 记录可以相互超越时,就会出现疑问。我看到的唯一订购保证是 tasks.max=1
    【解决方案2】:

    并行性如何与单个消息结合使用 变压器?

    您的第一个答案是正确的 - 转换器按照连接器配置中定义的顺序在每个正在运行的任务中执行。一个任务生成的每一个SourceRecord,都会被同一个任务中的所有Transformer处理,然后发送给Kafka。

    我这样推理是否正确,是否有办法规避这种情况?还是仅在此用例中仅使用一项任务才有可能提供订购保证。

    保证消息排序的最简单方法是使用单个任务,但这显然无法扩展。有几种方法可以解决这个问题。

    1. 对任务读取的消息进行分区,以便每个任务始终使用相同的键读取所有消息。一些 Message Queue 服务器内置了对此的支持。例如,ActiveMQ 支持Selectors。在这种情况下,您可以让每个任务在以下情况下只读消息:

    <message ID> MOD <Number of tasks> == <Task ID>

    这实现起来并非易事(例如,您需要在运行时处理任务数量的变化),但却是可行的。

    1. 根据 ID 将原始消息划分到不同的 ActiveMQ 队列中。所以现在你为一个 ActiveMQ 队列获得了一个 Connect 任务。

    2. 使用 Kafka Streams 对消息进行排序。基本上,您将从 ActiveMQ 读取消息并使用 Kafka Connect 将它们写入主题“未排序消息”。一个单独的 Kafka Streams 应用程序将从“未排序消息”主题中读取,并将排序后的数据写入“排序消息”主题。这在这里讨论:Apache Kafka order windowed messages based on their value

    【讨论】:

      猜你喜欢
      • 2019-04-28
      • 1970-01-01
      • 2017-12-04
      • 1970-01-01
      • 2017-01-07
      • 2018-02-18
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多