【问题标题】:How to transform rename fields for different topics using SMT如何使用 SMT 为不同主题转换重命名字段
【发布时间】:2021-07-17 23:53:13
【问题描述】:

我有多个主题需要重命名它显示的架构状态中的字段Caused by: org.apache.kafka.connect.errors.ConnectException: Table: topic2 is missing field ([SinkRecordField{schema=Schema{STRING}, name='stand_user', isPrimaryKey=false}]

所以我的配置是这样的

"topics":"topic1, topic2",
"transforms":"RenameField",
"transforms.RenameField.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameField.renames":"user:stand_user, user:session_user",

stand_user 是 topic1 的一部分,但 session_user 是 topic2 的一部分 现在 topic2 出错了,因为它无法处理stand_user,即使stand_user 应该只在 topic1 中。我试着把它分开

"topics":"topic1, topic2",
"transforms":"RenameTopic1Field, RenameTopic2Field",
"transforms.RenameTopic1Field.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameTopic1Field.renames":"user:stand_user",
"transforms.RenameTopic2Field.type":"org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.RenameTopic2Field.renames":"user:session_user",

但是随后重命名中的下一个字段失败了同样的错误 任何将主题重命名分开的建议

【问题讨论】:

    标签: apache-kafka apache-kafka-connect


    【解决方案1】:

    转换适用于配置中的所有主题

    如果您无法将特定转换应用于该连接器使用的单个主题,它将阻止连接器读取所有其他主题,因此您必须创建一个单独的配置来隔离所需的行为

    【讨论】:

    • 这是否意味着我无法将转换分离为 RenameTopic1Field、RenameTopic2Field。为了满足 SMT,我总是必须创建另一个连接器?不会有很多连接器吗?这是因为我在第一个主题中只重命名了一个字段,而我为第二个主题有另一个重命名字段。我不能将两个重命名字段放入一个配置中。为什么连接器说它正在运行并且消息能够消费然后连接器在一天左右后停止
    • 如前所述,转换适用于所有主题。您不能针对某些字段中的单个字段。更多的连接器将启动更多的消费者线程,但它会更有弹性地应对故障。连接器不会自行重启,所以我不确定我是否理解最后一个问题
    • 如果我们的连接器较少,排除故障不是更好吗?这是因为我试图将更多主题放入 1 个连接器,而 SMT 则放入同一连接器中的所有主题,因为它使用相同的数据库连接 url。也可以这样分发。我知道转换适用于所有主题,但我看到当我添加一个新主题时,我会像原来的 topic1 一样受到影响。然后主题 2 将没有消息进入,或者同一连接器中的所有主题都停止接收消息任何想法
    • 如已回答,如果转换为一个主题引发异常(例如无法找到某个字段),则同一配置中的所有主题都将停止使用。因此,具有单个主题的单独配置更容易排除故障,因为一个连接器 /status 端点上只有一个主题会显示为不健康
    猜你喜欢
    • 2020-11-20
    • 2021-10-17
    • 2020-06-26
    • 2021-05-02
    • 2011-04-26
    • 2021-02-15
    • 1970-01-01
    • 2020-12-01
    • 2019-06-20
    相关资源
    最近更新 更多