【问题标题】:Strimzi kafka connect with debezium mongodb-connector not creating using REST (Mongodb as Source)Strimzi kafka 与 debezium mongodb-connector 连接不使用 REST 创建(Mongodb 作为源)
【发布时间】:2022-08-22 16:44:06
【问题描述】:

安装 srimzi

helm repo add strimzi https://strimzi.io/charts/ && helm install strimzi-kafka strimzi/strimzi-kafka-operator

输出:

Name:         strimzi-cluster-operator-587cb79468-hrs9q  
strimzi-cluster-operator with (quay.io/strimzi/operator:0.28.0)

(strimzi-kafka-connect:0.28) 我曾经使用以下 Dockerfile 构建映像

FROM quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
COPY ./debezium-connector-mongodb/ /opt/kafka/plugins/debezium/
COPY ./confluentinc-kafka-connect-elasticsearch/ /opt/kafka/plugins/debezium/
COPY ./mongodb-kafka-connect-mongodb-1.7.0/ /opt/kafka/plugins/debezium/
RUN chown -R kafka:root /opt/kafka
USER 1001

以下是 KafkaConnect 配置:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec:
  version: 3.1.0
  replicas: 1
  bootstrapServers: my-cluster-kafka-bootstrap:9093
  image: strimzi-kafka-connect:0.28.2
  tls:
    trustedCertificates:
      - secretName: my-cluster-cluster-ca-cert
        certificate: ca.crt
  config:
    group.id: connect-cluster
    offset.storage.topic: connect-cluster-offsets
    config.storage.topic: connect-cluster-configs
    status.storage.topic: connect-cluster-status
    # -1 means it will use the default replication factor configured in the broker
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1

使用https://github.com/strimzi/strimzi-kafka-operator/releases/tag/0.28.0 安装了 Kafka

具体(kafka-persistent.yaml)具有以下配置:

apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
  name: my-cluster
spec:
  kafka:
    version: 3.1.0
    replicas: 3
    listeners:
      - name: plain
        port: 9092
        type: internal
        tls: false
      - name: tls
        port: 9093
        type: internal
        tls: true
    config:
      offsets.topic.replication.factor: 3
      transaction.state.log.replication.factor: 3
      transaction.state.log.min.isr: 2
      default.replication.factor: 3
      min.insync.replicas: 2
      inter.broker.protocol.version: \"3.1\"
    storage:
      type: jbod
      volumes:
      - id: 0
        type: persistent-claim
        size: 10Gi
        deleteClaim: false
  zookeeper:
    replicas: 3
    storage:
      type: persistent-claim
      size: 10Gi
      deleteClaim: false
  entityOperator:
    topicOperator: {}
    userOperator: {}

以下是插件的状态:

kubectl exec my-connect-cluster-connect-59cfff997b-4kv9b  -it my-connect-cluster-connect -- curl http://localhost:8083/connector-plugins | jq \'.\' 
[
  {
    \"class\": \"com.mongodb.kafka.connect.MongoSinkConnector\",
    \"type\": \"sink\",
    \"version\": \"1.7.0\"
  },
  {
    \"class\": \"com.mongodb.kafka.connect.MongoSourceConnector\",
    \"type\": \"source\",
    \"version\": \"1.7.0\"
  },
  {
    \"class\": \"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector\",
    \"type\": \"sink\",
    \"version\": \"11.1.8\"
  },
  {
    \"class\": \"io.debezium.connector.mongodb.MongoDbConnector\",
    \"type\": \"source\",
    \"version\": \"1.8.1.Final\"
  },
  {
    \"class\": \"io.debezium.connector.mysql.MySqlConnector\",
    \"type\": \"source\",
    \"version\": \"1.0.0.Final\"
  },
  {
    \"class\": \"org.apache.kafka.connect.file.FileStreamSinkConnector\",
    \"type\": \"sink\",
    \"version\": \"3.1.0\"
  },
  {
    \"class\": \"org.apache.kafka.connect.file.FileStreamSourceConnector\",
    \"type\": \"source\",
    \"version\": \"3.1.0\"
  },
  {
    \"class\": \"org.apache.kafka.connect.mirror.MirrorCheckpointConnector\",
    \"type\": \"source\",
    \"version\": \"1\"
  },
  {
    \"class\": \"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector\",
    \"type\": \"source\",
    \"version\": \"1\"
  },
  {
    \"class\": \"org.apache.kafka.connect.mirror.MirrorSourceConnector\",
    \"type\": \"source\",
    \"version\": \"1\"
  }
]

当我为 mysql 创建连接器时,一切正常。

[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl -i -X POST -H \"Accept:application/json\" \\
>     -H  \"Content-Type:application/json\" http://localhost:8083/connectors/ \\
>     -d  \'{
>   \"name\": \"inventory-test-mysql\",
>   \"config\": {
>     \"connector.class\": \"io.debezium.connector.mysql.MySqlConnector\",
>     \"tasks.max\": \"1\",
>     \"database.hostname\": \"172.17.0.7\",
>     \"database.port\": \"3306\",
>     \"database.user\": \"root\",
>     \"database.password\": \"debezium\",
>     \"database.server.id\": \"184054\",
>     \"database.server.name\": \"dbserver1\",
>     \"database.include.list\": \"inventory\",
>     \"database.history.kafka.bootstrap.servers\": \"my-cluster-kafka-bootstrap:9092\",
>     \"database.history.kafka.topic\": \"schema-changes-for-inventory\",
>     \"include.schema.changes\": \"true\"
>   }
> }\'
HTTP/1.1 201 Created
Date: Tue, 22 Feb 2022 11:50:14 GMT
Location: http://localhost:8083/connectors/inventory-test-mysql
Content-Type: application/json
Content-Length: 560
Server: Jetty(9.4.43.v20210629)

{\"name\":\"inventory-test-mysql\",\"config\":{\"connector.class\":\"io.debezium.connector.mysql.MySqlConnector\",\"tasks.max\":\"1\",\"database.hostname\":\"172.17.0.7\",\"database.port\":\"3306\",\"database.user\":\"root\",\"database.password\":\"debezium\",\"database.server.id\":\"184054\",\"database.server.name\":\"dbserver1\",\"database.include.list\":\"inventory\",\"database.history.kafka.bootstrap.servers\":\"my-cluster-kafka-bootstrap:9092\",\"database.history.kafka.topic\":\"schema-changes-for-inventory\",\"include.schema.changes\":\"true\",\"name\":\"inventory-test-mysql\"},\"tasks\":[],\"type\":\"source\"}

主题和连接器创建成功,输出如下:

➜  ~ kubectl exec --tty -i kafka-client-strimzi -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --list
__consumer_offsets
__strimzi-topic-operator-kstreams-topic-store-changelog
__strimzi_store_topic
connect-cluster-configs
connect-cluster-offsets
connect-cluster-status
dbserver1
dbserver1.inventory.addresses
dbserver1.inventory.customers
dbserver1.inventory.geom
dbserver1.inventory.orders
dbserver1.inventory.products
dbserver1.inventory.products_on_hand
schema-changes-customers
schema-changes-for-inventory

连接器状态:

[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl http://localhost:8083/connectors

[\"inventory-test-mysql\"]


[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl http://localhost:8083/connectors/inventory-test-mysql/status

{\"name\":\"inventory-test-mysql\",\"connector\":{\"state\":\"RUNNING\",\"worker_id\":\"172.17.0.19:8083\"},\"tasks\":[{\"id\":0,\"state\":\"RUNNING\",\"worker_id\":\"172.17.0.19:8083\"}],\"type\":\"source\"}

每当我在库存数据库表中添加或更新记录时,我都会收到架构更改。

问题:

整个设置都不适用

{
    \"class\": \"com.mongodb.kafka.connect.MongoSinkConnector\",
    \"type\": \"sink\",
    \"version\": \"1.7.0\"
  },
  {
    \"class\": \"com.mongodb.kafka.connect.MongoSourceConnector\",
    \"type\": \"source\",
    \"version\": \"1.7.0\"
  }

或者


  {
    \"class\": \"io.debezium.connector.mongodb.MongoDbConnector\",
    \"type\": \"source\",
    \"version\": \"1.8.1.Final\"
  }

这是 curl 的输出:

[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl -i -X POST -H \"Accept:application/json\" \\
>     -H  \"Content-Type:application/json\" http://localhost:8083/connectors/ \\
>     -d  \'{
>   \"name\": \"mongodb-connector\",
>   \"config\": {
>       \"tasks.max\":1,
>       \"connector.class\":\"com.mongodb.kafka.connect.MongoSourceConnector\",
>       \"connection.uri\":\"mongodb://root:password@172.17.0.20:27017\",
>       \"key.converter\":\"org.apache.kafka.connect.storage.StringConverter\",
>       \"value.converter\":\"org.apache.kafka.connect.storage.StringConverter\",
>       \"key.converter.schemas.enable\": \"false\",
>       \"value.converter.schemas.enable\": \"false\",
>       \"database\":\"mydb\",
>       \"collection\":\"dataSource\"
>   }
> }\'
HTTP/1.1 500 Server Error
Cache-Control: must-revalidate,no-cache,no-store
Content-Type: application/json
Content-Length: 137
Connection: close
Server: Jetty(9.4.43.v20210629)

{
\"servlet\":\"org.glassfish.jersey.servlet.ServletContainer-1d98daa0\",
\"message\":\"Request failed.\",
\"url\":\"/connectors/\",
\"status\":\"500\"
}

或与

[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl -i -X POST -H \"Accept:application/json\" \\
>     -H  \"Content-Type:application/json\" http://localhost:8083/connectors/ \\
>     -d  \'{
>   \"name\": \"mongodb-connector\",
>   \"config\": {
>     \"connector.class\": \"io.debezium.connector.mongodb.MongoDbConnector\",
>     \"mongodb.hosts\": \"rs0/172.17.0.20:27017,rs0/172.17.0.21:27017\",
>     \"mongodb.name\": \"mydb\",
>     \"mongodb.user\": \"root\",
>     \"mongodb.password\": \"password\",
>     \"database.whitelist\": \"mydb[.]*\"
>   }
> }\'
HTTP/1.1 500 Server Error
Cache-Control: must-revalidate,no-cache,no-store
Content-Type: application/json
Content-Length: 137
Connection: close
Server: Jetty(9.4.43.v20210629)

{
\"servlet\":\"org.glassfish.jersey.servlet.ServletContainer-1d98daa0\",
\"message\":\"Request failed.\",
\"url\":\"/connectors/\",
\"status\":\"500\"
}

如果你让我知道我错过了什么,我将不胜感激。

如果使用 Strimzi 和 Mongodb 无法实现 REST,那么还有什么替代方法?

它与 Strimzi-Kafka 的版本有关吗? 它与Mongodb插件的版本有关吗?

谢谢

  • 您使用的 REST 接口来自 Apache Kafka,而不是来自 Strimzi。所以我想你应该检查 Kafka Connect 日志,了解问题是什么,为什么它不喜欢你的请求,并与 MongoDb 人员一起查看你的请求是否正确。
  • PS:您可能不应该将所有插件复制到/opt/kafka/plugins/debezium/ -> 通常每个插件都应该在/opt/kafka/plugins/ 下有自己的子目录。
  • @Jakub 感谢您的回复。我再次构建了图像并将插件设置在 /opt/kafka/plugins/ 中。我在日志中收到错误Caused by: java.lang.NoClassDefFoundError: org/apache/avro/Schema ││ at com.mongodb.kafka.connect.source.MongoSourceConfig.createConfigDef(MongoSourceConfig.java:747) 对我来说这似乎是版本问题
  • 这表明您可能缺少一些需要添加的类。可能是不属于 Apache Kafka 项目的 Avro 转换器。
  • @Jakub,已解决。这是版本问题,这就是为什么他无法找到某些类的 mongodb 插件。我在 minikube 上将版本从 quay.io/strimzi/kafka:0.28.0-kafka-3.1.0 降级为 FROM strimzi/kafka:0.17.0-kafka-2.4.0 以及版本 --kubernetes-version v1.21.10(查看更多 github.com/kubernetes/kubernetes/tags)。为此,我需要在 minikube 上重新创建集群。需要检查直到 0.28.0 的哪个版本才可以使用。当我有时间时,我将很快发布一篇包含完整工作设置的文章。感谢您的反馈和阅读问题。

标签: mongodb strimzi mongodb-kafka-connector


【解决方案1】:

@Faaiz 您是否已经有时间发布带有完整设置的文章?

【讨论】:

猜你喜欢
  • 2022-10-20
  • 2019-07-22
  • 2020-05-17
  • 2020-04-27
  • 2022-10-13
  • 2020-02-07
  • 2020-05-07
  • 2020-04-27
  • 2020-07-08
相关资源
最近更新 更多