【问题标题】:MySQL to Confluent Enterprise Kafka data ingestionMySQL 到 Confluent Enterprise Kafka 数据摄取
【发布时间】:2019-04-13 21:37:58
【问题描述】:

我们有一个 3 节点融合的企业 kafka 集群(linux on-prem),一个节点运行 kafka 连接服务。我们想使用 mysql 将数据摄取到 kafka 主题中。

尝试了以下东西 -

1.在我的本地windows桌面安装mysql,创建db,table并插入一些数据。

2.创建了一个source-quickstart-mysql.properties 文件,包含以下详细信息-

connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://<IPAddressOfLocalMachine>:3306/test_db?user=root&password=pwd
tables.whitelist=emp
mode=incrementing
incrementing.column.name=empid
topic.prefix=test-mysql-jdbc-

connect-standalone.properties 有这个信息:

bootstrap.servers=IPaddressOfKCnode:9092
plugin.path=/usr/share/java
  1. 重启kafka连接服务

  2. 尝试向kafka connect service提交连接我的sql的请求-

curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IPaddressOfKCnode:8083/connectors/ -d '{"name": "emp-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://IPaddressOfLocalMachine:3306/test_db?user=root&password=pwd","table.whitelist": "emp","mode": "timestamp","topic.prefix": "mysql-" } }'

出现以下错误:

{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=admin for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}

我还尝试了以下操作-

a.停止 kafka 连接服务并手动运行 -

systemctl stop confluent-kafka-connect

b.像这样运行连接

/usr/bin/connect-standalone /etc/kafka/connect-standalone.properties /etc/kafka-connect-jdbc/source-quickstart-mysql.properties

这个过程在开始时成功启动,但在一段时间后结束。以下是日志:

[2018-11-10 19:42:53,027] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:55)
[2018-11-10 19:42:53,048] INFO AbstractConfig values:
        batch.max.rows = 100
        catalog.pattern = null
        connection.attempts = 3
        connection.backoff.ms = 10000
        connection.password = null
        connection.url = jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd
        connection.user = null
        dialect.name =
        incrementing.column.name = empid
        mode = incrementing
        numeric.mapping = null
        numeric.precision.mapping = false
        poll.interval.ms = 5000
        query =
        schema.pattern = null
        table.blacklist = []
        table.poll.interval.ms = 60000
        table.types = [TABLE]
        table.whitelist = []
        timestamp.column.name = []
        timestamp.delay.interval.ms = 0
        topic.prefix = test-mysql-jdbc-
        validate.non.null = true
 (org.apache.kafka.common.config.AbstractConfig:279)
[2018-11-10 19:45:00,439] INFO AbstractConfig values:
        batch.max.rows = 100
        catalog.pattern = null
        connection.attempts = 3
        connection.backoff.ms = 10000
        connection.password = null
        connection.url = jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=admin
        connection.user = null
        dialect.name =
        incrementing.column.name = empid
        mode = incrementing
        numeric.mapping = null
        numeric.precision.mapping = false
        poll.interval.ms = 5000
        query =
        schema.pattern = null
        table.blacklist = []
        table.poll.interval.ms = 60000
        table.types = [TABLE]
        table.whitelist = []
        timestamp.column.name = []
        timestamp.delay.interval.ms = 0
        topic.prefix = test-mysql-jdbc-
        validate.non.null = true
 (org.apache.kafka.common.config.AbstractConfig:279)
[2018-11-10 19:47:07,666] ERROR Failed to create job for /etc/kafka-connect-jdbc/source-quickstart-mysql.properties (org.apache.kafka.connect.cli.ConnectStandalone:102)
[2018-11-10 19:47:07,668] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:113)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
        at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
        at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:110)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&password=admin
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
        at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:415)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:189)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
[2018-11-10 19:47:07,669] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:65)

我在 confluent 网站上找不到流畅且适当的文档,以将 Kafka Connect 服务用于其各种连接器、配置等。 请帮助获取实施数据摄取管道的正确步骤:mySQL--kafkaconnect--kafka

最后,我期望 mysql 表中的插入会在 kafka 主题中产生数据,而 kafka 消费者会显示这些记录。 这种摄取似乎很简单,但我错过了一些基本的连接属性:(

谢谢!

【问题讨论】:

    标签: mysql apache-kafka apache-kafka-connect confluent-platform


    【解决方案1】:

    这似乎是 JDBC 连接器的问题。你运行的是什么 MySQL 版本? 要解决此问题,您需要:

    1. 如果您正在运行 MySQL 8,请下载 Connector/J 8.0.13,对于旧版本,请下载 5.1.47
    2. 将jar文件放在/usr/share/java/kafka-connect-jdbc/下。
    3. 重新启动 Kafka Connect 并启动您的 mysql 连接器。

    【讨论】:

      【解决方案2】:

      首先,您的错误是从curl 命令输出返回给您的:

      Connector configuration is invalid and contains the following 2 error(s) java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&amp;password=pwd

      所以,您的 Kafka Connect 路径中缺少 MySQL 的 JDBC 驱动程序。


      第二个错误在您发布的输出中:

      Connector configuration is invalid and contains the following 2 error(s): Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure. The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://192.168.178.14:3306/test_db?user=root&amp;password=admin

      这表明 Kafka Connect 无法访问您的 MySQL 机器。

      你在哪里运行 Confluent Platform,它是在 Docker 中,还是 MySQL 的本地机器等等? 192.168.178.14 是您的 MySQL 服务器的地址吗?可以从运行 Kafka Connect 的主机访问它吗?


      您可以找到几个使用 Kafka 设置 MySQL 的示例:

      有关 JDBC 连接器与基于日志的 CDC 的优缺点,请参阅https://www.confluent.io/blog/no-more-silos-how-to-integrate-your-databases-with-apache-kafka-and-cdc

      免责声明:我写了以上博文。

      【讨论】:

        【解决方案3】:

        感谢 Robin & Giorgos 的回答!它有很大帮助。 这个问题与几件事有关 - 1. 缺少 MySQL JDBC 连接器 jar 。 我们必须将 MySQL Connector/J 8.0.13 放在 /usr/share/java/kafka-connect-jdbc/ 下。

        2.连接问题是因为kafka连接试图连接的MySQL用户没有足够的权限连接到远程连接服务。 为此,我创建了一个新的 mySQL 用户,具有完全权限并可以访问远程服务器(Kafka 连接)。

        完成上述步骤后,重启kafka-connect,摄取管道开始工作。

        【讨论】:

          猜你喜欢
          • 2022-11-10
          • 1970-01-01
          • 2021-10-14
          • 2018-11-02
          • 1970-01-01
          • 2018-04-15
          • 1970-01-01
          • 2022-12-16
          • 1970-01-01
          相关资源
          最近更新 更多