【问题标题】:Kafka Connect flatten transformation of a postgres record with array fieldKafka Connect 使用数组字段展平 postgres 记录的转换
【发布时间】:2020-08-05 04:35:52
【问题描述】:

我有一个使用 Kafka Connect 连接到 Kafka 的 postgres 数据库,以便将 CDC 事件放在一个主题上。 我们使用扁平化转换作为共享配置的一部分:

flattenKey:  "org.apache.kafka.connect.transforms.Flatten$Key"

表中的一列是 ARRAY 类型,因此在尝试应用转换时出现异常:

Flatten transformation does not support ARRAY for record without schemas (for field after.role_ids).

Kafka 连接参考:https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java#L246

我知道数组不能展平,为什么,我的问题是 - 有什么办法可以保持记录的展平,但以某种方式转换/转换数组,以便我仍然可以使用它?

该数组将包含整数,因此将其转换为一个字符串,其中所有元素都用逗号分隔,例如对我有用。 任何其他建议都会很棒。

我们使用 Debezium 进行配置。

【问题讨论】:

  • 你的有效载荷是什么样子的?

标签: postgresql apache-kafka apache-kafka-connect debezium


【解决方案1】:

您需要编写自己的代码来处理这个问题,可以是 custom Single Message Transform,也可以是 Kafka Streams 之类的流处理器。

您还可以在this issue 上发表评论/投票,以将有助于解决此问题的函数添加到 ksqlDB 中。

【讨论】:

    【解决方案2】:

    如果您使用的是 Debezium 1.1 或更高版本,我建议您使用 custom column converter 而不是 SMT。转换器让您可以直接在 Debezium 内部调整架构和值,因此“将其转换为所有元素用逗号分隔的字符串”只需一点编码即可完成。

    【讨论】:

      【解决方案3】:

      在 JDBC 接收器连接器中,数组原语通过票证 https://github.com/confluentinc/kafka-connect-jdbc/pull/805 处理

      【讨论】:

        猜你喜欢
        • 2020-01-03
        • 2019-08-22
        • 2018-11-23
        • 1970-01-01
        • 2021-07-20
        • 1970-01-01
        • 2015-05-19
        • 2020-11-26
        • 2018-08-09
        相关资源
        最近更新 更多