【问题标题】:mqtt broker and kafka connect standalonemqtt 代理和 kafka 独立连接
【发布时间】:2021-10-04 03:45:38
【问题描述】:

我想从 mqtt 代理(mosquitto)摄取数据到 kafka 代理(apache kafka),所以我按照这些步骤进行

1/ 我从 evokly github 创建了我的 mqtt 连接器:所以我下载代码并使用 gradlew 构建项目并生成 .jar 依赖项 (kafka-connect-mqtt-1.1-SNAPSHOT.jar) 2/ 我将 .jar 文件放入 C:\kafka_2.13-2.8.0\connectors 目录 3/ 在我将 CLASSPATH 添加到系统变量之后

4/
5/ 6/我启动zookeeper和服务器kafka 7/ 我开始 kafka 连接

.\bin\windows\connect-standalone  .\config\connect-standalone.properties  .\config\mqtt.properties

8/ 9/ 现在当 istart mosquitto 并发布到 mqtt 主题时 10/ 我的 kafka 主题没有收到任何内容

注意:当我在 mosquitto 上发布和消费时,它也可以正常工作,当我在 kafka 上生产并从 kafka 消费时,它也可以正常工作,但是使用 kafka 连接没有问题,但我没有收到任何帮助数据

这是日志

    [2021-07-29 17:17:21,844] INFO Kafka Connect standalone worker initial
    [2021-07-29 17:17:21,846] INFO Kafka Connect starting (org.apache.kafk
    [2021-07-29 17:17:21,851] INFO Herder starting (org.apache.kafka.conne
    [2021-07-29 17:17:21,853] INFO Worker starting (org.apache.kafka.conne
    [2021-07-29 17:17:21,856] INFO Starting FileOffsetBackingStore with fi
    [2021-07-29 17:17:21,867] INFO Worker started (org.apache.kafka.connec
    [2021-07-29 17:17:21,868] INFO Herder started (org.apache.kafka.connec
    [2021-07-29 17:17:21,870] INFO Initializing REST resources (org.apache
    [2021-07-29 17:17:21,993] INFO Adding admin resources to main listener
    [2021-07-29 17:17:22,214] INFO DefaultSessionIdManager workerName=node
    [2021-07-29 17:17:22,216] INFO No SessionScavenger set, using defaults
    [2021-07-29 17:17:22,220] INFO node0 Scavenging every 600000ms (org.ec
    juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resour
applicable in the SERVER runtime. Due to constraint configuration probnored.
    juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resourfaces applicable in the SERVER runtime. Due to constraint configuratioe will be ignored.
    juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
    AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resourable in the SERVER runtime. 
    Due to constraint configuration problems t
    juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.inject.Provide
AVERTISSEMENT: A provider org.apache.kafka.connect.runtime.rest.resourlicable in the SERVER runtime. Due to constraint configuration problem
    juil. 29, 2021 5:17:23 PM org.glassfish.jersey.internal.Errors logErro
AVERTISSEMENT: The following warnings have been detected: WARNING: Theource contains empty path annotation.
    WARNING: The (sub)resource method createConnector in org.apache.kafka.
    WARNING: The (sub)resource method listConnectors in org.apache.kafka.c
    WARNING: The (sub)resource method listConnectorPlugins in org.apache.k
    WARNING: The (sub)resource method serverInfo in org.apache.kafka.conne

    [2021-07-29 17:17:23,568] INFO Started o.e.j.s.ServletContextHandler@1
    [2021-07-29 17:17:23,571] INFO REST resources initialized; server is s
    [2021-07-29 17:17:23,573] INFO Kafka Connect started (org.apache.kafka
    [2021-07-29 17:17:23,605] INFO AbstractConfig values:
 (org.apache.kafka.common.config.AbstractConfig:372)
    [2021-07-29 17:17:23,638] INFO Creating connector mqtt of type com.evo
    [2021-07-29 17:17:23,641] INFO SourceConnectorConfig values:
        config.action.reload = restart
        connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConn
        errors.log.enable = true
        errors.log.include.messages = true
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = mqtt
        predicates = []
        tasks.max = 1
        topic.creation.groups = []
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:372)
    [2021-07-29 17:17:23,649] INFO EnrichedConnectorConfig values:
        config.action.reload = restart
        connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConn
        errors.log.enable = true
        errors.log.include.messages = true
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = mqtt
        predicates = []
        tasks.max = 1
        topic.creation.groups = []
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorCo
    [2021-07-29 17:17:23,696] INFO Instantiated connector mqtt with versioa.connect.runtime.Worker:284)
[2021-07-29 17:17:23,702] INFO Finished creating connector mqtt (org.a
[2021-07-29 17:17:23,705] INFO Start a MqttSourceConnector (com.evokly
[2021-07-29 17:17:23,707] INFO MqttSourceConnectorConfig values:
        kafka.topic = kafka-test
        message_processor_class = class com.evokly.kafka.connect.mqtt.
        mqtt.clean_session = true
        mqtt.client_id = null
        mqtt.connection_timeout = 30
        mqtt.keep_alive_interval = 60
        mqtt.password = xxxxxx
        mqtt.qos = 1
        mqtt.server_uris = tcp://localhost:1883
        mqtt.ssl.ca_cert = null
        mqtt.ssl.cert = null
        mqtt.ssl.key = null
        mqtt.topic = mqtt
        mqtt.user = null
 (com.evokly.kafka.connect.mqtt.MqttSourceConnectorConfig:372)
[2021-07-29 17:17:23,714] INFO Initialize transform process properties
[2021-07-29 17:17:23,714] INFO Initialize transform process properties[2021-07-29 17:17:23,714] INFO Initialize transform process properties
[2021-07-29 17:17:23,714] INFO Initialize transform process properties
(com.evokl
y.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
[2021-07-29 17:17:23,714] INFO Initialize transform process properties
(c
om.evokly.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
[2021-07-29 17:17:23,714] INFO Initialize transform process propertie
s                                                                    n
(com.evokly.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
[2021-07-29 17:17:23,714] INFO Initialize transform process properties (com.evokly.kafka.connect.mqtt.MqttSourceConnector:77)
[2021-07-29 17:17:23,719] INFO SourceConnectorConfig values:
        config.action.reload = restart
        connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConnector        errors.log.enable = true
        errors.log.include.messages = true
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none
        header.converter = null
        key.converter = null
        name = mqtt
        predicates = []
        tasks.max = 1
        topic.creation.groups = []
        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.SourceConnectorConfig:372)
[2021-07-29 17:17:23,724] INFO EnrichedConnectorConfig values:
        config.action.reload = restart
        connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConnector        errors.log.enable = true        errors.log.include.messages = true
        errors.retry.delay.max.ms = 60000
        errors.retry.timeout = 0
        errors.tolerance = none        header.converter = null
        key.converter = null
        name = mqtt
        predicates = []
        tasks.max = 1
        topic.creation.groups = []        transforms = []
        value.converter = null
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:372)

蚊帐

    1627580533: mosquitto version 2.0.11 starting
1627580533: Using default config.
1627580533: Starting in local only mode. Connections will only be possible from clients running on this machine.
1627580533: Create a configuration file which defines a listener to allow remote access.
1627580533: For more details see https://mosquitto.org/documentation/authentication-methods/
1627580533: Opening ipv4 listen socket on port 1883.
1627580533: Opening ipv6 listen socket on port 1883.
1627580533: mosquitto version 2.0.11 running
1627588404: New connection from ::1:57594 on port 1883.
1627588404: New client connected from ::1:57594 as auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E (p2, c1, k60).
1627588404: No will message specified.
1627588404: Sending CONNACK to auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E (0, 0)
1627588404: Received PUBLISH from auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E (d0, q0, r0, m0, 'mqtt', ... (3 bytes))
1627588404: Received DISCONNECT from auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E
1627588404: Client auto-9C24C5D0-B74D-9EC0-49FE-B4EA2A55272E disconnected.
1627588938: New connection from ::1:57846 on port 1883.
1627588938: New client connected from ::1:57846 as auto-2798744F-5690-0500-211D-0EBD497E50F1 (p2, c1, k60).
1627588938: No will message specified.
1627588938: Sending CONNACK to auto-2798744F-5690-0500-211D-0EBD497E50F1 (0, 0)
1627588938: Received PUBLISH from auto-2798744F-5690-0500-211D-0EBD497E50F1 (d0, q0, r0, m0, 'mqtt', ... (3 bytes))
1627588938: Received DISCONNECT from auto-2798744F-5690-0500-211D-0EBD497E50F1
1627588938: Client auto-2798744F-5690-0500-211D-0EBD497E50F1 disconnected.

【问题讨论】:

  • 请在问题本身中显示文件,而不是外部链接。 help center
  • @OneCricketeer 现在我将图像本身置于问题之中
  • 这对你有用吗? mosquitto_sub -h localhost -p 1883 -t mqtt
  • 当我用这个命令启动 mosquitto 时它可以工作:启动 mosquitto 但是当我用这个命令启动 mosquitto 时:net start mosquitto 我必须添加 -u 用户和 -P 密码
  • 也不要使用图片。请添加完整的文件。我们中的一些人无法访问 Imgur

标签: apache-kafka mqtt apache-kafka-connect


【解决方案1】:

将以下条目添加到您的 mqtt.properties 文件中:

errors.log.enable=true
errors.log.include.messages=true
errors.tolerance=none

另外,在 /config 下的 Kafka 安装目录中找到文件 connect-log4j.properties 并更改此行:

log4j.rootLogger=INFO, stdout, connectAppender

到:

log4j.rootLogger=TRACE, stdout, connectAppender

您应该获得大量诊断信息以帮助澄清问题。如果没有解决问题,请将输出添加到问题中(请不要粘贴屏幕截图。复制文本输出以便我们搜索文本)

【讨论】:

  • 当我将这两行添加到 mqtt.properties 文件 kafka 连接停止
  • 我修正了答案 - 尝试新设置
  • 编辑您的问题并在那里添加日志。确保使用大括号“{}”工具将粘贴的文本标记为代码,以便其更具可读性。
  • 我认为问题出在蚊子经纪人身上
  • 看起来像。从位于github.com/evokly/kafka-connect-mqtt/blob/master/src/main/java/… 的代码中,您应该在任务开始时看到那条长线。它不存在,因此连接器实际上并没有启动任务,甚至没有访问 mqtt。请联系连接器开发人员寻求帮助。
猜你喜欢
  • 2021-10-15
  • 1970-01-01
  • 2020-11-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-01-24
  • 2021-10-21
  • 2019-06-22
相关资源
最近更新 更多