【发布时间】:2022-08-22 16:44:06
【问题描述】:
安装 srimzi
helm repo add strimzi https://strimzi.io/charts/ && helm install strimzi-kafka strimzi/strimzi-kafka-operator
输出:
Name: strimzi-cluster-operator-587cb79468-hrs9q
strimzi-cluster-operator with (quay.io/strimzi/operator:0.28.0)
(strimzi-kafka-connect:0.28) 我曾经使用以下 Dockerfile 构建映像
FROM quay.io/strimzi/kafka:0.28.0-kafka-3.1.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/debezium
COPY ./debezium-connector-mysql/ /opt/kafka/plugins/debezium/
COPY ./debezium-connector-mongodb/ /opt/kafka/plugins/debezium/
COPY ./confluentinc-kafka-connect-elasticsearch/ /opt/kafka/plugins/debezium/
COPY ./mongodb-kafka-connect-mongodb-1.7.0/ /opt/kafka/plugins/debezium/
RUN chown -R kafka:root /opt/kafka
USER 1001
以下是 KafkaConnect 配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
spec:
version: 3.1.0
replicas: 1
bootstrapServers: my-cluster-kafka-bootstrap:9093
image: strimzi-kafka-connect:0.28.2
tls:
trustedCertificates:
- secretName: my-cluster-cluster-ca-cert
certificate: ca.crt
config:
group.id: connect-cluster
offset.storage.topic: connect-cluster-offsets
config.storage.topic: connect-cluster-configs
status.storage.topic: connect-cluster-status
# -1 means it will use the default replication factor configured in the broker
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
使用https://github.com/strimzi/strimzi-kafka-operator/releases/tag/0.28.0 安装了 Kafka
具体(kafka-persistent.yaml)具有以下配置:
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.1.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: true
config:
offsets.topic.replication.factor: 3
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2
inter.broker.protocol.version: \"3.1\"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: false
zookeeper:
replicas: 3
storage:
type: persistent-claim
size: 10Gi
deleteClaim: false
entityOperator:
topicOperator: {}
userOperator: {}
以下是插件的状态:
kubectl exec my-connect-cluster-connect-59cfff997b-4kv9b -it my-connect-cluster-connect -- curl http://localhost:8083/connector-plugins | jq \'.\'
[
{
\"class\": \"com.mongodb.kafka.connect.MongoSinkConnector\",
\"type\": \"sink\",
\"version\": \"1.7.0\"
},
{
\"class\": \"com.mongodb.kafka.connect.MongoSourceConnector\",
\"type\": \"source\",
\"version\": \"1.7.0\"
},
{
\"class\": \"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector\",
\"type\": \"sink\",
\"version\": \"11.1.8\"
},
{
\"class\": \"io.debezium.connector.mongodb.MongoDbConnector\",
\"type\": \"source\",
\"version\": \"1.8.1.Final\"
},
{
\"class\": \"io.debezium.connector.mysql.MySqlConnector\",
\"type\": \"source\",
\"version\": \"1.0.0.Final\"
},
{
\"class\": \"org.apache.kafka.connect.file.FileStreamSinkConnector\",
\"type\": \"sink\",
\"version\": \"3.1.0\"
},
{
\"class\": \"org.apache.kafka.connect.file.FileStreamSourceConnector\",
\"type\": \"source\",
\"version\": \"3.1.0\"
},
{
\"class\": \"org.apache.kafka.connect.mirror.MirrorCheckpointConnector\",
\"type\": \"source\",
\"version\": \"1\"
},
{
\"class\": \"org.apache.kafka.connect.mirror.MirrorHeartbeatConnector\",
\"type\": \"source\",
\"version\": \"1\"
},
{
\"class\": \"org.apache.kafka.connect.mirror.MirrorSourceConnector\",
\"type\": \"source\",
\"version\": \"1\"
}
]
当我为 mysql 创建连接器时,一切正常。
[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl -i -X POST -H \"Accept:application/json\" \\
> -H \"Content-Type:application/json\" http://localhost:8083/connectors/ \\
> -d \'{
> \"name\": \"inventory-test-mysql\",
> \"config\": {
> \"connector.class\": \"io.debezium.connector.mysql.MySqlConnector\",
> \"tasks.max\": \"1\",
> \"database.hostname\": \"172.17.0.7\",
> \"database.port\": \"3306\",
> \"database.user\": \"root\",
> \"database.password\": \"debezium\",
> \"database.server.id\": \"184054\",
> \"database.server.name\": \"dbserver1\",
> \"database.include.list\": \"inventory\",
> \"database.history.kafka.bootstrap.servers\": \"my-cluster-kafka-bootstrap:9092\",
> \"database.history.kafka.topic\": \"schema-changes-for-inventory\",
> \"include.schema.changes\": \"true\"
> }
> }\'
HTTP/1.1 201 Created
Date: Tue, 22 Feb 2022 11:50:14 GMT
Location: http://localhost:8083/connectors/inventory-test-mysql
Content-Type: application/json
Content-Length: 560
Server: Jetty(9.4.43.v20210629)
{\"name\":\"inventory-test-mysql\",\"config\":{\"connector.class\":\"io.debezium.connector.mysql.MySqlConnector\",\"tasks.max\":\"1\",\"database.hostname\":\"172.17.0.7\",\"database.port\":\"3306\",\"database.user\":\"root\",\"database.password\":\"debezium\",\"database.server.id\":\"184054\",\"database.server.name\":\"dbserver1\",\"database.include.list\":\"inventory\",\"database.history.kafka.bootstrap.servers\":\"my-cluster-kafka-bootstrap:9092\",\"database.history.kafka.topic\":\"schema-changes-for-inventory\",\"include.schema.changes\":\"true\",\"name\":\"inventory-test-mysql\"},\"tasks\":[],\"type\":\"source\"}
主题和连接器创建成功,输出如下:
➜ ~ kubectl exec --tty -i kafka-client-strimzi -- bin/kafka-topics.sh --bootstrap-server my-cluster-kafka-bootstrap:9092 --list
__consumer_offsets
__strimzi-topic-operator-kstreams-topic-store-changelog
__strimzi_store_topic
connect-cluster-configs
connect-cluster-offsets
connect-cluster-status
dbserver1
dbserver1.inventory.addresses
dbserver1.inventory.customers
dbserver1.inventory.geom
dbserver1.inventory.orders
dbserver1.inventory.products
dbserver1.inventory.products_on_hand
schema-changes-customers
schema-changes-for-inventory
连接器状态:
[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl http://localhost:8083/connectors
[\"inventory-test-mysql\"]
[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl http://localhost:8083/connectors/inventory-test-mysql/status
{\"name\":\"inventory-test-mysql\",\"connector\":{\"state\":\"RUNNING\",\"worker_id\":\"172.17.0.19:8083\"},\"tasks\":[{\"id\":0,\"state\":\"RUNNING\",\"worker_id\":\"172.17.0.19:8083\"}],\"type\":\"source\"}
每当我在库存数据库表中添加或更新记录时,我都会收到架构更改。
问题:
整个设置都不适用
{
\"class\": \"com.mongodb.kafka.connect.MongoSinkConnector\",
\"type\": \"sink\",
\"version\": \"1.7.0\"
},
{
\"class\": \"com.mongodb.kafka.connect.MongoSourceConnector\",
\"type\": \"source\",
\"version\": \"1.7.0\"
}
或者
{
\"class\": \"io.debezium.connector.mongodb.MongoDbConnector\",
\"type\": \"source\",
\"version\": \"1.8.1.Final\"
}
这是 curl 的输出:
[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl -i -X POST -H \"Accept:application/json\" \\
> -H \"Content-Type:application/json\" http://localhost:8083/connectors/ \\
> -d \'{
> \"name\": \"mongodb-connector\",
> \"config\": {
> \"tasks.max\":1,
> \"connector.class\":\"com.mongodb.kafka.connect.MongoSourceConnector\",
> \"connection.uri\":\"mongodb://root:password@172.17.0.20:27017\",
> \"key.converter\":\"org.apache.kafka.connect.storage.StringConverter\",
> \"value.converter\":\"org.apache.kafka.connect.storage.StringConverter\",
> \"key.converter.schemas.enable\": \"false\",
> \"value.converter.schemas.enable\": \"false\",
> \"database\":\"mydb\",
> \"collection\":\"dataSource\"
> }
> }\'
HTTP/1.1 500 Server Error
Cache-Control: must-revalidate,no-cache,no-store
Content-Type: application/json
Content-Length: 137
Connection: close
Server: Jetty(9.4.43.v20210629)
{
\"servlet\":\"org.glassfish.jersey.servlet.ServletContainer-1d98daa0\",
\"message\":\"Request failed.\",
\"url\":\"/connectors/\",
\"status\":\"500\"
}
或与
[kafka@my-connect-cluster-connect-59cfff997b-4kv9b kafka]$ curl -i -X POST -H \"Accept:application/json\" \\
> -H \"Content-Type:application/json\" http://localhost:8083/connectors/ \\
> -d \'{
> \"name\": \"mongodb-connector\",
> \"config\": {
> \"connector.class\": \"io.debezium.connector.mongodb.MongoDbConnector\",
> \"mongodb.hosts\": \"rs0/172.17.0.20:27017,rs0/172.17.0.21:27017\",
> \"mongodb.name\": \"mydb\",
> \"mongodb.user\": \"root\",
> \"mongodb.password\": \"password\",
> \"database.whitelist\": \"mydb[.]*\"
> }
> }\'
HTTP/1.1 500 Server Error
Cache-Control: must-revalidate,no-cache,no-store
Content-Type: application/json
Content-Length: 137
Connection: close
Server: Jetty(9.4.43.v20210629)
{
\"servlet\":\"org.glassfish.jersey.servlet.ServletContainer-1d98daa0\",
\"message\":\"Request failed.\",
\"url\":\"/connectors/\",
\"status\":\"500\"
}
如果你让我知道我错过了什么,我将不胜感激。
如果使用 Strimzi 和 Mongodb 无法实现 REST,那么还有什么替代方法?
它与 Strimzi-Kafka 的版本有关吗? 它与Mongodb插件的版本有关吗?
谢谢
-
您使用的 REST 接口来自 Apache Kafka,而不是来自 Strimzi。所以我想你应该检查 Kafka Connect 日志,了解问题是什么,为什么它不喜欢你的请求,并与 MongoDb 人员一起查看你的请求是否正确。
-
PS:您可能不应该将所有插件复制到
/opt/kafka/plugins/debezium/-> 通常每个插件都应该在/opt/kafka/plugins/下有自己的子目录。 -
@Jakub 感谢您的回复。我再次构建了图像并将插件设置在
/opt/kafka/plugins/中。我在日志中收到错误Caused by: java.lang.NoClassDefFoundError: org/apache/avro/Schema ││ at com.mongodb.kafka.connect.source.MongoSourceConfig.createConfigDef(MongoSourceConfig.java:747)对我来说这似乎是版本问题 -
这表明您可能缺少一些需要添加的类。可能是不属于 Apache Kafka 项目的 Avro 转换器。
-
@Jakub,已解决。这是版本问题,这就是为什么他无法找到某些类的 mongodb 插件。我在 minikube 上将版本从
quay.io/strimzi/kafka:0.28.0-kafka-3.1.0降级为FROM strimzi/kafka:0.17.0-kafka-2.4.0以及版本--kubernetes-version v1.21.10(查看更多 github.com/kubernetes/kubernetes/tags)。为此,我需要在 minikube 上重新创建集群。需要检查直到 0.28.0 的哪个版本才可以使用。当我有时间时,我将很快发布一篇包含完整工作设置的文章。感谢您的反馈和阅读问题。
标签: mongodb strimzi mongodb-kafka-connector