【发布时间】:2015-06-25 10:02:21
【问题描述】:
我用 Python 写了一个消费者如下:
from kafka import KafkaConsumer
import avro.schema
import avro.io
import io
# To consume messages
consumer = KafkaConsumer('test',
group_id='',
bootstrap_servers=['kafka:9092'])
schema = """
{
"namespace":"com.martinkl.bottledwater.dbschema.public",
"type":"record",
"name":"test",
"fields":[
{"name":"id","type":["int", "null"]},
{"name":"value","type":["string", "null"]}
]
}
"""
schema = avro.schema.parse(schema)
for msg in consumer:
bytes_reader = io.BytesIO(msg.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
hello = reader.read(decoder)
print hello
一切似乎都很好,但是当我运行将数据插入 Postgres 时:
postgres=# insert into test (value) values('hello world!');
consumer的输出为空:
$ python consumer_bottledwater-pg.py
{u'id': 0, u'value': u''}
请帮我解决它。提前谢谢你。
【问题讨论】:
标签: python avro apache-kafka