【问题标题】:With bottledwater-pg, how to read data by a Python consumer?使用bottledwater-pg,Python消费者如何读取数据?
【发布时间】:2015-06-25 10:02:21
【问题描述】:

我用 Python 写了一个消费者如下:

from kafka import KafkaConsumer
import avro.schema
import avro.io
import io

# To consume messages
consumer = KafkaConsumer('test',
                         group_id='',
                         bootstrap_servers=['kafka:9092'])


schema = """
{
    "namespace":"com.martinkl.bottledwater.dbschema.public",
    "type":"record",
    "name":"test",
    "fields":[
        {"name":"id","type":["int", "null"]},
        {"name":"value","type":["string", "null"]}
    ]
}
"""
schema = avro.schema.parse(schema)

for msg in consumer:
    bytes_reader = io.BytesIO(msg.value)
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    hello = reader.read(decoder)
    print hello

一切似乎都很好,但是当我运行将数据插入 Postgres 时:

postgres=# insert into test (value) values('hello world!');

consumer的输出为空:

$ python consumer_bottledwater-pg.py 
{u'id': 0, u'value': u''}

请帮我解决它。提前谢谢你。

【问题讨论】:

    标签: python avro apache-kafka


    【解决方案1】:

    Bottled Water 发布到 Kafka 的 Avro 编码消息以 5 字节标头为前缀。第一个字节始终为零(保留以备将来使用),接下来的 4 个字节是一个大端序的 32 位数字,指示架构的 ID。

    在您的示例中,您在 Python 应用程序中硬编码了架构,但是一旦上游数据库架构发生更改,这种方法就会失效。这就是为什么瓶装水最好与schema registry 结合使用。当您从 Kafka 读取消息时,您首先对标头进行解码以查找架构 ID,如果您之前没有看到该架构 ID,请 query the registry 查找架构。然后,您可以使用该模式解码消息的其余部分。架构可以缓存在消费者中,因为注册表保证特定 ID 的架构是不可变的。

    您还可以查看架构注册表随附的KafkaAvroDeserializer 的代码,以了解如何在 Java 中完成此解码。你可以在 Python 中做同样的事情。

    【讨论】:

    • 非常感谢您的快速回答和建议。
    • KafkaMessage(topic='test', partition=0, offset=127, key='\x00\x00\x00\x00\x01\x02\xf8\x01', value='\x00\x00\x00\x00\x02\x02\xf8\x01\x02\x18hello world!') 所以在我的情况下,如果我需要解码 Avro 数据,是否必须删除“值”中的 5 字节标头?
    • @NguyenSyThanhSon 是的。前 5 个字节\x00\x00\x00\x00\x02 表示它的模式 ID 为 2。只需查看我链接到的 Java 代码,它就向您展示了如何解码它。
    【解决方案2】:

    非常感谢@Martin Kleppmann。我按照你的指导做了。它工作正常。

    value = bytearray(msg.value)
    bytes_reader = io.BytesIO(value[5:])
    decoder = avro.io.BinaryDecoder(bytes_reader)
    reader = avro.io.DatumReader(schema)
    hello = reader.read(decoder)
    print hello
    

    详情请见python-kafka-avro

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-02-13
      • 1970-01-01
      • 1970-01-01
      • 2021-08-07
      相关资源
      最近更新 更多