【发布时间】:2021-10-04 03:45:38
【问题描述】:
我想从 mqtt 代理(mosquitto)摄取数据到 kafka 代理(apache kafka),所以我按照这些步骤进行
1/ 我从 evokly github 创建了我的 mqtt 连接器:所以我下载代码并使用 gradlew 构建项目并生成 .jar 依赖项 (kafka-connect-mqtt-1.1-SNAPSHOT.jar) 2/ 我将 .jar 文件放入 C:\kafka_2.13-2.8.0\connectors 目录 3/ 在我将 CLASSPATH 添加到系统变量之后
4/
5/
6/我启动zookeeper和服务器kafka
7/ 我开始 kafka 连接
.\bin\windows\connect-standalone .\config\connect-standalone.properties .\config\mqtt.properties
8/ 9/ 现在当 istart mosquitto 并发布到 mqtt 主题时 10/ 我的 kafka 主题没有收到任何内容
注意:当我在 mosquitto 上发布和消费时,它也可以正常工作,当我在 kafka 上生产并从 kafka 消费时,它也可以正常工作,但是使用 kafka 连接没有问题,但我没有收到任何帮助数据
这是日志
[2021-07-29 17:17:21,844] INFO Kafka Connect standalone worker initial
[2021-07-29 17:17:21,846] INFO Kafka Connect starting (org.apache.kafk
[2021-07-29 17:17:21,851] INFO Herder starting (org.apache.kafka.conne
[2021-07-29 17:17:21,853] INFO Worker starting (org.apache.kafka.conne
[2021-07-29 17:17:21,856] INFO Starting FileOffsetBackingStore with fi
[2021-07-29 17:17:21,867] INFO Worker started (org.apache.kafka.connec
[2021-07-29 17:17:21,868] INFO Herder started (org.apache.kafka.connec
[2021-07-29 17:17:21,870] INFO Initializing REST resources (org.apache
[2021-07-29 17:17:21,993] INFO Adding admin resources to main listener
[2021-07-29 17:17:22,214] INFO DefaultSessionIdManager workerName=node
[2021-07-29 17:17:22,216] INFO No SessionScavenger set, using defaults
[2021-07-29 17:17:22,220] INFO node0 Scavenging every 600000ms (org.ec
juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resour
applicable in the SERVER runtime. Due to constraint configuration probnored.
juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resourfaces applicable in the SERVER runtime. Due to constraint configuratioe will be ignored.
juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resourable in the SERVER runtime.
Due to constraint configuration problems t
juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resourlicable in the SERVER runtime. Due to constraint configuration problem
juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.Errors logErro
AVERTISSEMENT: The following warnings have been detected: WARNING: Theource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.c
WARNING: The (sub)resource method listConnectorPlugins in org.apache.k
WARNING: The (sub)resource method serverInfo in org.apache.kafka.conne
[2021-07-29 17:17:23,568] INFO Started o.e.j.s.ServletContextHandler@1
[2021-07-29 17:17:23,571] INFO REST resources initialized; server is s
[2021-07-29 17:17:23,573] INFO Kafka Connect started (org.apache.kafka
[2021-07-29 17:17:23,605] INFO AbstractConfig values:
(org.apache.kafka.common.config.AbstractConfig:372)
[2021-07-29 17:17:23,638] INFO Creating connector mqtt of type com.evo
[2021-07-29 17:17:23,641] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConn
errors.log.enable = true
errors.log.include.messages = true
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = mqtt
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.SourceConnectorConfig:372)
[2021-07-29 17:17:23,649] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConn
errors.log.enable = true
errors.log.include.messages = true
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = mqtt
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorCo
[2021-07-29 17:17:23,696] INFO Instantiated connector mqtt with versioa.connect.runtime.Worker:284)
[2021-07-29 17:17:23,702] INFO Finished creating connector mqtt (org.a
[2021-07-29 17:17:23,705] INFO Start a MqttSourceConnector (com.evokly
[2021-07-29 17:17:23,707] INFO MqttSourceConnectorConfig values:
kafka.topic = kafka-test
message_processor_class = class com.evokly.kafka.connect.mqtt.
mqtt.clean_session = true
mqtt.client_id = null
mqtt.connection_timeout = 30
mqtt.keep_alive_interval = 60
mqtt.password = xxxxxx
mqtt.qos = 1
mqtt.server_uris = tcp://localhost:1883
mqtt.ssl.ca_cert = null
mqtt.ssl.cert = null
mqtt.ssl.key = null
mqtt.topic = mqtt
mqtt.user = null
(com.evokly.kafka.connect.mqtt.MqttSourceConnectorConfig:372)
[2021-07-29 17:17:23,714] INFO Initialize transform process properties
[2021-07-29 17:17:23,714] INFO Initialize transform process properties[2021-07-29 17:17:23,714] INFO Initialize transform process properties
[2021-07-29 17:17:23,714] INFO Initialize transform process properties
(com.evokl
y.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
[2021-07-29 17:17:23,714] INFO Initialize transform process properties
(c
om.evokly.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
[2021-07-29 17:17:23,714] INFO Initialize transform process propertie
s n
(com.evokly.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
[2021-07-29 17:17:23,714] INFO Initialize transform process properties (com.evokly.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConnector errors.log.enable = true
errors.log.include.messages = true
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = mqtt
predicates = []
tasks.max = 1
topic.creation.groups = []
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.SourceConnectorConfig:372)
[2021-07-29 17:17:23,724] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConnector errors.log.enable = true errors.log.include.messages = true
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none header.converter = null
key.converter = null
name = mqtt
predicates = []
tasks.max = 1
topic.creation.groups = [] transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:372)
蚊帐
1627580533: mosquitto version 2.0.11 starting
1627580533: Using default config.
1627580533: Starting in local only mode. Connections will only be possible from clients running on this machine.
1627580533: Create a configuration file which defines a listener to allow remote access.
1627580533: For more details see https://mosquitto.org/documentation/authentication-methods/
1627580533: Opening ipv4 listen socket on port 1883.
1627580533: Opening ipv6 listen socket on port 1883.
1627580533: mosquitto version 2.0.11 running
1627588404: New connection from ::1:57594 on port 1883.
1627588404: New client connected from ::1:57594 as auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E (p2, c1, k60).
1627588404: No will message specified.
1627588404: Sending CONNACK to auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E (0, 0)
1627588404: Received PUBLISH from auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E (d0, q0, r0, m0, 'mqtt', ... (3 bytes))
1627588404: Received DISCONNECT from auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E
1627588404: Client auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E disconnected.
1627588938: New connection from ::1:57846 on port 1883.
1627588938: New client connected from ::1:57846 as auto-2798744F-5690-0500-211D-0EBD497E50F1 (p2, c1, k60).
1627588938: No will message specified.
1627588938: Sending CONNACK to auto-2798744F-5690-0500-211D-0EBD497E50F1 (0, 0)
1627588938: Received PUBLISH from auto-2798744F-5690-0500-211D-0EBD497E50F1 (d0, q0, r0, m0, 'mqtt', ... (3 bytes))
1627588938: Received DISCONNECT from auto-2798744F-5690-0500-211D-0EBD497E50F1
1627588938: Client auto-2798744F-5690-0500-211D-0EBD497E50F1 disconnected.
【问题讨论】:
-
请在问题本身中显示文件,而不是外部链接。 help center
-
@OneCricketeer 现在我将图像本身置于问题之中
-
这对你有用吗? mosquitto_sub -h localhost -p 1883 -t mqtt
-
当我用这个命令启动 mosquitto 时它可以工作:启动 mosquitto 但是当我用这个命令启动 mosquitto 时:net start mosquitto 我必须添加 -u 用户和 -P 密码
-
也不要使用图片。请添加完整的文件。我们中的一些人无法访问 Imgur
标签: apache-kafka mqtt apache-kafka-connect