【问题标题】:Confluent:Topic was not created after loading a ConnectorConfluent:加载连接器后未创建主题
【发布时间】:2019-02-15 20:52:55
【问题描述】:

我尝试连接多个数据库,包括 MySQL 和 MSSQL,但没有问题。

但是当我尝试连接到某个远程 MySQL 数据库(只能从我公司的网络访问)时,该数据库包含一个包含近 300 万条记录的视图,连接器已加载并且状态表示它正在运行但没有没有创建任何主题来使用其中的数据。

可能是什么原因?我在哪里可以找到正确的日志文件以了解发生了什么?

这是连接器的示例:

{
  "name": "mysql-source",
   "config": {
   "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
   "key.converter": "io.confluent.connect.avro.AvroConverter",
   "key.converter.schema.registry.url": "http://localhost:8081",
   "value.converter": "io.confluent.connect.avro.AvroConverter",
   "value.converter.schema.registry.url": "http://localhost:8081",
   "incrementing.column.name": "Id",
   "tasks.max": "1",
   "table.types": "VIEW",
   "table.whitelist": "ticket_rep",
   "mode": "incrementing",
   "topic.prefix": "mysql-",
   "name": "mysql-source",
   "validate.non.null": "false",
   "connection.url": "jdbc:mysql://XX.XXX.XX.XX:3306/database? 
    user=user&password=password"
 }

}

这些是我运行 confluent log connect 时的日志结果:

> [2018-09-11 16:37:57,382] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='foo', query='null', topicPrefix='mysql-', timestampColumn='null', incrementingColumn='id'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:247)
java.sql.SQLException: Java heap space
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
    at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:122)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:975)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1025)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
    at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
    at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
[2018-09-11 16:38:02,523] ERROR Failed to run query for table TimestampIncrementingTableQuerier{name='foo', query='null', topicPrefix='mysql-', timestampColumn='null', incrementingColumn='id'}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:247)
com.mysql.cj.jdbc.exceptions.PacketTooBigException: Packet for query is too large (7,562,612 > 4,194,304). You can change this value on the server by setting the 'max_allowed_packet' variable.
    at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:107)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeInternal(ClientPreparedStatement.java:975)
    at com.mysql.cj.jdbc.ClientPreparedStatement.executeQuery(ClientPreparedStatement.java:1025)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
    at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
    at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
    at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

【问题讨论】:

  • 你确定MySQL上的表不为空吗?
  • @GiorgosMyrianthous 是的,这是一个包含近 300 万行的视图
  • 你能发布你的连接器的配置吗?请分享代码而不是截图。
  • 你检查过 Kafka Connect 日志吗?
  • 另外,这是您第一次尝试运行此连接器,还是您之前运行过它并且没问题?

标签: jdbc apache-kafka apache-kafka-connect confluent-platform


【解决方案1】:

com.mysql.cj.jdbc.exceptions.PacketTooBigException: Packet for query is too large 是 MySQL 端的一个问题,可以通过增加 max_allowed_packet 变量的值来解决。为此,您需要包含

max_allowed_packet=512M

my.cnf(或my.ini,取决于您运行的操作系统)文件(在[mysqld] 部分内)下,然后重新启动MySQL。重启 MySQL 后,

SHOW VARIABLES LIKE 'max_allowed_packet';

应该返回您在 MySQL 配置文件中设置的值。有关此错误的更多详细信息,您可以参考MySQL documentation


java.sql.SQLException: Java heap space,表示 Kafka connect 堆空间不足。可以通过运行来控制启动和最大堆大小

KAFKA_HEAP_OPTS="-Xms512m -Xmx1g" connect-standalone connect-worker.properties mysql-source-connector.properties

设置起始堆大小为 512 MB,最大大小为 1 GB。您可能需要根据需要更改尺寸。

【讨论】:

  • 我做了所有事情,但仍然出现“Java 堆空间”错误。显示变量,如“max_allowed_pa​​cket”;结果是:536870912
  • @MahmoudElbably 查看我的更新答案并告诉我。
  • 对不起,我似乎不明白应该在哪里运行这个命令?
  • 你之前是如何运行 Kafka Connect 的?
  • 非常感谢,问题解决了!非常感谢。
猜你喜欢
  • 1970-01-01
  • 2021-08-25
  • 1970-01-01
  • 2018-09-14
  • 2021-01-05
  • 2019-06-11
  • 2018-09-23
  • 2019-08-08
  • 2021-07-21
相关资源
最近更新 更多