【问题标题】:Kafka-connect add more topics on the flyKafka-connect 即时添加更多主题
【发布时间】:2019-08-13 14:43:33
【问题描述】:

我有一个 elasticsearch kafka-connect connector 正在使用一些主题。
配置如下:

{
    connection.url": "https://my-es-cluster:443",
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.ignore": "true",
    "topics": "topic1,topic2",
    ...
}

我可以在它运行时添加更多主题吗?
会发生什么?
如果我从列表中删除一些主题并稍后再次添加它们会怎样。

我想在这里添加一个新的topic3

{
    ...
    "topics": "topic1,topic2,topic3",
    ...
}

什么是我删除topic2?是否会重新消费其他主题?:

{
    ...
    "topics": "topic1,topic3",
    ...
}

【问题讨论】:

    标签: apache-kafka kafka-consumer-api apache-kafka-connect


    【解决方案1】:

    由于您已经运行了 kafkakafka-connect,您可以使用 kafka-connect 的 REST API 并自行检查:https://docs.confluent.io/current/connect/references/restapi.html

    如果您添加一个新主题 (topic3),当前在该主题中的所有消息(根据保留策略)都将被使用。

    PUT http://kafka-connect:8083/connectors/my-test-connector/config
    {
       ...
       "topics": "topic1,topic2,topic3",
       ...
    }
    

    检查此连接器的状态和配置:

    GET http://kafka-connect:8083/connectors/my-test-connector
    

    如果您想禁用某些主题,只需使用 PUT 更新该连接器的配置。

    PUT http://kafka-connect:8083/connectors/my-test-connector/config
    {
       ...
       "topics": "topic1,topic3",
       ...
    }
    

    topic1topic3 不会有任何变化。只是topic2 不会再被消费了。
    但是,如果您想将其返回,来自topic2 的消息将从最后提交的偏移量开始使用,而不是从开始处开始。

    对于每个上次提交的consumer group 存储offset,您从配置中删除主题一段时间都没关系。 对于这种情况,消费者组将是connect-my-test-connector

    即使您删除了连接器 (DELETE http://kafka-connect:8083/connectors/my-test-connector),然后使用相同的名称再次创建它,偏移量也会被保存,并且在您删除它时从它们开始继续消费。 (请注意保留政策,通常为 7 天)。

    【讨论】:

      猜你喜欢
      • 2017-01-08
      • 2019-07-02
      • 1970-01-01
      • 2020-12-08
      • 2019-01-20
      • 2017-05-27
      • 2018-09-30
      • 2020-11-27
      • 2016-12-31
      相关资源
      最近更新 更多