【问题标题】:MQTT Kafka Source connector : funny byte charactersMQTT Kafka 源连接器:有趣的字节字符
【发布时间】:2019-05-18 16:09:26
【问题描述】:

我正在关注https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example 使用 MQTT 源连接器连接 Mosquitto 和 Kafka。我将 Mosquitto Publisher 发送的数据发送到 Mosquitto Subscriber 和 Kafka Consumer。但是我的 kafka-consumer 的 ConsumerRecord 对象中的键和值字段有一些前置字节字符。 下面是代码 sn-ps 和我得到的输出。

mqttPublisher.py

while v3 < 3:
             data3 = {
                      "time": str(datetime.datetime.now().time()),
                       "val": v3
                      }
             client.publish("sensor/dist", json.dumps(data3), qos=2)

             v3 += 1
             time.sleep(2)

mqttSubscriber.py

def on_message_print(client, userdata, message):
            print(message.topic,message.payload)

subscribe.callback(on_message_print, "sensor/#", hostname="localhost")

kafkaConsumer.py

consumer = KafkaConsumer('mqtt.',
                     bootstrap_servers=['localhost:9092'])

for message in consumer:
   print(message)

输出:mqttSubscriber.py

sensor/dist b'{"time": "12:44:30.817462", "val": 0}'

sensor/dist b'{"time": "12:44:32.820040", "val": 1}'

sensor/dist b'{"time": "12:44:34.822657", "val": 2}'

输出:kafkaConsumer.py

ConsumerRecord(topic='mqtt.', partition=0, offset=225, timestamp=1545117270870, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist' , value=b'\x00\x00\x00\x00\x02J{"time": "12:44:30.817462", "val": 0}', headers=[ ('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b' false')], checksum=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)

ConsumerRecord(topic='mqtt.', partition=0, offset=226, timestamp=1545117272821, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist' , value=b'\x00\x00\x00\x00\x02J{"time": "12:44:32.820040", "val": 1}', headers=[ ('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b' false')], checksum=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)

ConsumerRecord(topic='mqtt.', partition=0, offset=227, timestamp=1545117274824, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist' , value=b'\x00\x00\x00\x00\x02J{"time": "12:44:34.822657", "val": 2}', headers=[ ('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b' false')], checksum=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)

是什么导致 Kafka Consumer 中的上述额外字节前置? 提前致谢。

【问题讨论】:

    标签: apache-kafka mqtt apache-kafka-connect mosquitto


    【解决方案1】:

    作为演示的一部分,您将启动架构注册表

    启动 Kafka Connect 和依赖项(Kafka、Zookeeper、Schema Registry):

    confluent start connect

    如果您查看前 5 个字节,您会发现它们以 0 开头,然后是代表整数的另外 4 个字节。

    查看Schema Registry Wire Format 并尝试执行curl localhost:8081/subjects 以查看它是否列出了mqtt-keymqtt-value 的主题名称。

    如果您不想要 Avro,则需要配置和编辑 Kafka Connect 属性文件以使用不同的转换器,而不是使用 confluent start,除非让 Kafka 和 Zookeeper 运行

    或者如果你想让 Python 反序列化 Avro,你可以参考 Github 上的 confluent-kafka-python repo

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-08-26
      • 2021-08-04
      • 2021-02-25
      • 2017-10-12
      • 1970-01-01
      • 1970-01-01
      • 2019-12-16
      相关资源
      最近更新 更多