【问题标题】:Streaming MSSQL CDC to AWS MSK with Debezium使用 Debezium 将 MSSQL CDC 流式传输到 AWS MSK
【发布时间】:2021-12-19 12:10:09
【问题描述】:

我是 Kafka 的新手,目前正在学习使用 Debezium 连接器将流数据从 MSSQL 更改为 Amazon MSK

我已经有一个启用 CDC 的 MS SQL Server,一个 MSK 集群,我可以通过 EC2 客户端手动连接、创建主题、生成和使用数据。现在我正在设置一个带有 Debezium SQL Server 连接器的 MSK Connect 作为自定义插件,这是我的 MSK 连接器配置:

connector.class = io.debezium.connector.sqlserver.SqlServerConnector, 
tasks.max = 1
database.hostname = xxx, 
database.port = xxx, 
database.user = xxx, 
database.password = xxx, 
database.dbname = dbName, 
database.server.name = serverName, 
table.include.list = dbo.tableName, 
database.history.kafka.bootstrap.servers = xxx, 
database.history.kafka.topic = xxx 

但我的 MSK 连接器一直返回“失败”状态。虽然我已经搜索过 Google,但似乎没有与我的想法相关的说明或指南。

这让我想知道我的解决方案是否可行?有人可以阐明一下并指出正确的方向吗?

已编辑:我从 CloudWatch 获得的一些日志

ERROR [AdminClient clientId=adminclient-1] Connection to node -2 () failed authentication due to: []: Access denied (org.apache.kafka.clients.NetworkClient:771)

INFO App info kafka.admin.client for adminclient-1 unregistered (org.apache.kafka.common.utils.AppInfoParser:83)

[INFO [AdminClient clientId=adminclient-1] Metadata update failed (org.apache.kafka.clients.admin.internals.AdminMetadataManager:235)

org.apache.kafka.connect.errors.ConnectException: Failed to connect to and describe Kafka cluster. Check worker's broker connection and security properties.

Caused by: org.apache.kafka.common.errors.SaslAuthenticationException: [4f91d358-fb7b-4f3b-8930-1b4aefce6d0b]: Access denied

[Worker-08134a52fe88cdc49] MSK Connect encountered errors and failed.

非常感谢,

【问题讨论】:

  • 日志是否告诉您失败的原因?
  • 一些日志说 SaslAuthenticationException 和 Failed to connect to and describe Kafka cluster。难道是我的 msk 连接器的角色没有足够的权限?或者也许我的 sqlserver 连接不正确?我正在使用端口 1433 的公共 ip4,我在帖子中更新了一些日志
  • 我不使用 MSK,但是如果您要连接到 SASL 或 SSL 侦听器,那么您还需要 Debezium 来定义 JAAS 或 SSL 属性
  • 问题是如何解决的?

标签: sql-server apache-kafka debezium aws-msk


【解决方案1】:

如果您为您的 MSK 集群使用基于 IAM 角色的身份验证,您的引导服务器端口将为 9098

除了所有属性,您还可以在 MSK 连接配置中发送这些属性

database.history.consumer.security.protocol=SASL_SSL
database.history.consumer.sasl.mechanism=AWS_MSK_IAM
database.history.consumer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.consumer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler
database.history.producer.security.protocol=SASL_SSL
database.history.producer.sasl.mechanism=AWS_MSK_IAM
database.history.producer.sasl.jaas.config=software.amazon.msk.auth.iam.IAMLoginModule required;
database.history.producer.sasl.client.callback.handler.class=software.amazon.msk.auth.iam.IAMClientCallbackHandler

参考:https://aws.amazon.com/blogs/aws/introducing-amazon-msk-connect-stream-data-to-and-from-your-apache-kafka-clusters-using-managed-connectors/

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2021-09-02
    • 2021-12-23
    • 2020-04-12
    • 2019-09-21
    • 2021-09-19
    • 1970-01-01
    • 1970-01-01
    • 2021-07-09
    相关资源
    最近更新 更多