【发布时间】:2019-05-18 16:09:26
【问题描述】:
我正在关注https://github.com/kaiwaehner/kafka-connect-iot-mqtt-connector-example 使用 MQTT 源连接器连接 Mosquitto 和 Kafka。我将 Mosquitto Publisher 发送的数据发送到 Mosquitto Subscriber 和 Kafka Consumer。但是我的 kafka-consumer 的 ConsumerRecord 对象中的键和值字段有一些前置字节字符。 下面是代码 sn-ps 和我得到的输出。
mqttPublisher.py
while v3 < 3:
data3 = {
"time": str(datetime.datetime.now().time()),
"val": v3
}
client.publish("sensor/dist", json.dumps(data3), qos=2)
v3 += 1
time.sleep(2)
mqttSubscriber.py
def on_message_print(client, userdata, message):
print(message.topic,message.payload)
subscribe.callback(on_message_print, "sensor/#", hostname="localhost")
kafkaConsumer.py
consumer = KafkaConsumer('mqtt.',
bootstrap_servers=['localhost:9092'])
for message in consumer:
print(message)
输出:mqttSubscriber.py
sensor/dist b'{"time": "12:44:30.817462", "val": 0}'
sensor/dist b'{"time": "12:44:32.820040", "val": 1}'
sensor/dist b'{"time": "12:44:34.822657", "val": 2}'
输出:kafkaConsumer.py
ConsumerRecord(topic='mqtt.', partition=0, offset=225, timestamp=1545117270870, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist' , value=b'\x00\x00\x00\x00\x02J{"time": "12:44:30.817462", "val": 0}', headers=[ ('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b' false')], checksum=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)
ConsumerRecord(topic='mqtt.', partition=0, offset=226, timestamp=1545117272821, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist' , value=b'\x00\x00\x00\x00\x02J{"time": "12:44:32.820040", "val": 1}', headers=[ ('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b' false')], checksum=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)
ConsumerRecord(topic='mqtt.', partition=0, offset=227, timestamp=1545117274824, timestamp_type=0, key=b'\x00\x00\x00\x00\x01\x16sensor/dist' , value=b'\x00\x00\x00\x00\x02J{"time": "12:44:34.822657", "val": 2}', headers=[ ('mqtt.message.id', b'0'), ('mqtt.qos', b'0'), ('mqtt.retained', b'false'), ('mqtt.duplicate', b' false')], checksum=None, serialized_key_size=17, serialized_value_size=43, serialized_header_size=62)
是什么导致 Kafka Consumer 中的上述额外字节前置? 提前致谢。
【问题讨论】:
标签: apache-kafka mqtt apache-kafka-connect mosquitto