【问题标题】:How to consume message from kafka which was produced by kafka-python?如何使用由 kafka-python 生成的来自 kafka 的消息?
【发布时间】:2021-02-16 01:49:45
【问题描述】:

我想获得有关 kafka 的帮助。

是否可以使用由kafka-python 生成的kafka-console-consumer.sh 来消费utf8(日语)消息?

  • kafka-python 代码
self._client = KafkaProducer(\
                    bootstrap_servers=bootstrap_servers,\
                    api_version=api_version,\
                    value_serializer=lambda m: json.dumps(m).encode('utf-8'))

for message in data:
    self._client.send(topic, message)
  • kafka-cosole-consumer.sh
./kafka-console-consumer.sh --bootstrap-server msk.amazonaws.com:9092 --topic meetings --from-beginning --property value.deserializer=org.apache.kafka.common.serialization.BytesDeserializer
  • 结果
"uuid": "d/U55wRdSj60yJBcYVpt8A==", "id": 4459052115, "topic": "\x5Cu6e21\x5Cu8fba \x5Cu88d5\x5Cu306e\x5Cu30d1\x5Cu30fc\x5Cu30bd\x5Cu30ca\x5Cu30eb\x5Cu30df\x5Cu30fc\x5Cu30c6\x5Cu30a3\x5Cu30f3\x5Cu30b0\x5Cu30eb\x5Cu30fc\x5Cu30e0", "host": "\x5Cu6e21\x5Cu8fba \x5Cu88d5", "email": "y-watanabe@creationline.com", "user_type": "\x5Cu30e9\x5Cu30a4\x5Cu30bb\x5Cu30f3\x5Cu30b9\x5Cu6e08\x5Cu307f", "start_time": "2020-11-03T10:36:29Z", "end_time": "", "duration": "", "participants": 1, "has_pstn": false, "has_voip": false, "has_3rd_party_audio": false, "has_video": false, "has_screen_share": false, "has_recording": false, "has_sip": false, "dept": ""}

日文字符串(utf-8)似乎没有正确反序列化。它已损坏

我正在尝试通过以下路线发送消息。

producer(kafka-python) -> AWS MSK (2.4.1.1) -> consumer(kafka-console-consumer.sh) 

【问题讨论】:

  • 在我看来它没有损坏

标签: java python-3.x apache-kafka kafka-producer-api kafka-python


【解决方案1】:

我有几件事我没有正确理解。

  1. ByteDeserializer 将 Byte 对象转换为 Java 中的 Byte 类型。正如@codeflush.dev 所说,我看到了正确的行为。

将字节数组中的记录值反序列化为值或对象。

  1. 我错过了 json.dumps 转义非 ascii 字符的事实。这由 ensure_ascii 选项控制。

如果 ensure_ascii 为真(默认值),则保证输出所有传入的非 ASCII 字符都已转义。如果 ensure_ascii 为 false,这些字符将按原样输出。

通过将 ensure_ascii 设置为 False 然后转换为 byte ,解决了这个问题。

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m, ensure_ascii=False).encode('utf-8'))

涉及

Saving utf-8 texts in json.dumps as UTF8, not as \u escape sequence

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-02-08
    • 2021-12-10
    • 1970-01-01
    • 2022-08-16
    • 2018-03-09
    • 2019-07-01
    • 1970-01-01
    • 2020-09-20
    相关资源
    最近更新 更多