【问题标题】:In Kafka Connector, how do I get the bootstrap-server address My Kafka Connect is currently using?在 Kafka 连接器中,如何获取我的 Kafka 连接器当前使用的引导服务器地址?
【发布时间】:2019-04-22 20:32:08
【问题描述】:

我正在自己开发一个 Kafka Sink 连接器。我的反序列化器是 JSONConverter。但是,当有人将错误的 JSON 数据发送到我的连接器主题时,我想省略这条记录并将这条记录发送到我公司的特定主题。

我的困惑是:我找不到任何 API 来获取我的 Connect 的 bootstrap.servers。(我知道它在 confluent 的 etc 目录中,但编写“connect-”目录的硬代码不是一个好主意Distributed.properties" 来获取 bootstrap.servers)

那么问题来了,我还有其他方法可以方便地在我的连接器程序中获取 bootstrap.servers 的值吗?

【问题讨论】:

  • Connect 的bootstrap.servers 是什么意思?您想要 Kafka Connect 主机还是 Kafka 代理?
  • @GiorgosMyrianthous 为我建立一个 Kafka 生产者的 bootstrap.servers...我认为应该是 Kafka 经纪人
  • 我认为您不能致电 REST Proxy API 来获取 bootstrap.serversYou can only get Brokers' IDs。我建议创建一个包含您的bootstrap.servers.properties 文件,并在生产者启动时加载该文件。
  • 我认为从磁盘读取文件没有任何问题,假设所有 Connect 工作人员都已安装...事实上,这就是新密码隐藏功能的工作原理
  • @cricket_007 如果文件名因环境不同而不同怎么办?即 etc/dev/dev-connect-distributed.properties 和 etc/prd/prd-connect-distributed.properties

标签: apache-kafka apache-kafka-connect


【解决方案1】:

与其尝试将“坏”记录从 SinkTask 发送到 Kafka,不如尝试使用 Kafka Connect 2.0 中添加的死信队列功能。

您可以将 Connect 运行时配置为自动将处理失败的记录转储到充当 DLQ 的配置主题。

更多详情,请查看添加此功能的KIP

【讨论】:

  • 值得指出 - Connect 可以独立于代理进行升级
  • @cricket_007 我们公司现在使用 Confluent4.0.1 和 Kafka 1.0.1,我们可以在继续使用 Kafka 1.0.1 集群的同时将 Confluent4 升级到 Confluent5 吗?
  • @Sky 你应该可以。自 Confluent 3.0 以来的代理支持 Connect 工作人员的较新 Confluent 版本
猜你喜欢
  • 1970-01-01
  • 2021-08-28
  • 2020-01-18
  • 1970-01-01
  • 2018-05-01
  • 2019-02-24
  • 2018-04-14
  • 2011-10-03
  • 2021-06-03
相关资源
最近更新 更多