【问题标题】:Kafka Connect confluent JDBC MySQL implementation errorsKafka Connect 融合 JDBC MySQL 实现错误
【发布时间】:2020-04-30 23:18:44
【问题描述】:

我正在尝试将 MySQL 与 Kafka Connect 连接,但出现了许多错误。我正在共享我的 connect-standalone.properties 和 mysql-jdbc-connector.properties,但出现了错误。我的 Kafka 和 MySQL 在不同的集群中,我使用的是 confluent 连接器,但不是在 confluent 界面中。我下载了 4.1.0 JDBC MySQL confluent 连接器。

mysql-jdbc-connector.properties

name=source-mysql
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.url=jdbc:mysql://1**.**.*.29:3306/kconnect?user=bigdata&password=bigdata
connection.user=bigdata
connection.password=bigdata
task.max=10
mode=bulk
topic.prefix=mysql-jdbc-
poll.interval.ms=3600000

connect-standalone.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# These are defaults. This file just demonstrates how to override some settings.
bootstrap.servers=Nifi-Staging:9092

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include
# any combination of:
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Note: symlinks will be followed to discover dependencies or plugins.
# Examples:
plugin.path=/usr/share/java

运行时

bin/connect-standalone.sh config/connect-standalone.properties config/mysql-jdbc-connector.properties

结果是

 (io.confluent.connect.jdbc.source.JdbcSourceTaskConfig:347)
[2020-01-14 14:01:33,289] INFO WorkerSourceTask{id=source-mysql-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask:200)
[2020-01-14 14:01:33,415] INFO [Producer clientId=connector-producer-source-mysql-0] Cluster ID: VgW2NunYREqVY5cHNS6snQ (org.apache.kafka.clients.Metadata:266)
[2020-01-14 14:01:43,610] INFO WorkerSourceTask{id=source-mysql-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:398)
[2020-01-14 14:01:43,611] INFO WorkerSourceTask{id=source-mysql-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:415)
[2020-01-14 14:01:44,319] ERROR WorkerSourceTask{id=source-mysql-0} Flush of offsets threw an unexpected exception:  (org.apache.kafka.connect.runtime.WorkerSourceTask:483)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.errors.ConnectException: java.nio.file.AccessDeniedException: /tmp/connect.offsets
        at java.util.concurrent.FutureTask.report(FutureTask.java:122)
        at java.util.concurrent.FutureTask.get(FutureTask.java:206)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.commitOffsets(WorkerSourceTask.java:472)
        at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.commit(SourceTaskOffsetCommitter.java:111)
        at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter.access$000(SourceTaskOffsetCommitter.java:46)
        at org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter$1.run(SourceTaskOffsetCommitter.java:84)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.ConnectException: java.nio.file.AccessDeniedException: /tmp/connect.offsets
        at org.apache.kafka.connect.storage.FileOffsetBackingStore.save(FileOffsetBackingStore.java:101)
        at org.apache.kafka.connect.storage.MemoryOffsetBackingStore$2.call(MemoryOffsetBackingStore.java:105)
        at org.apache.kafka.connect.storage.MemoryOffsetBackingStore$2.call(MemoryOffsetBackingStore.java:99)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        ... 3 more
Caused by: java.nio.file.AccessDeniedException: /tmp/connect.offsets
        at sun.nio.fs.UnixException.translateToIOException(UnixException.java:84)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:102)
        at sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:107)
        at sun.nio.fs.UnixFileSystemProvider.newByteChannel(UnixFileSystemProvider.java:214)
        at java.nio.file.spi.FileSystemProvider.newOutputStream(FileSystemProvider.java:434)
        at java.nio.file.Files.newOutputStream(Files.java:216)
        at org.apache.kafka.connect.storage.FileOffsetBackingStore.save(FileOffsetBackingStore.java:92)
        ... 6 more
[2020-01-14 14:01:44,326] ERROR WorkerSourceTask{id=source-mysql-0} Failed to commit offsets (org.apache.kafka.connect.runtime.SourceTaskOffsetCommitter:114)

【问题讨论】:

  • 您没有写入/tmp/connect.offsets 的权限。看看你的用户和这个目录的用户权限。

标签: apache-kafka apache-kafka-connect


【解决方案1】:

欢迎来到 StackOverflow :)

您在此处看到的错误:

java.nio.file.AccessDeniedException: /tmp/connect.offsets

表示问题 - 运行 Kafka Connect 进程的用户没有写入文件 /tmp/connect.offsets 的权限。 Kafka Connect 需要这个文件来存储连接器的进度。您应该使该文件夹可由用户写入,然后重新启动 Kafka Connect 工作程序。

【讨论】:

  • 或者改用connect-distributed,不需要写文件
【解决方案2】:

快速解决方案:

chown cp-kafka-connect:confluent /tmp/connect.offsets

我认为您需要管理权限。通过授予“cp-kafka-connect”权限来访问您从中提取数据的源目录,或者更改 systemd 服务“confluent-kafka-connect.service”的 UID 和 GID

【讨论】:

    猜你喜欢
    • 2021-01-13
    • 1970-01-01
    • 2019-02-23
    • 2018-11-30
    • 2020-11-09
    • 2020-07-09
    • 2020-10-23
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多