【发布时间】:2020-12-14 05:58:59
【问题描述】:
我在 Python 2.7 中使用 Avro Producer。我需要发送带有键和值的消息, 该值在主题中有 Avro-Schema,但键没有 Avro-Schema(我无法为键添加 Schema - 遗留原因)。
这是我的代码:
def main():
kafkaBrokers = os.environ.get('KAFKA_BROKERS')
schemaRegistry = os.environ.get('SCHEMA_REGISTRY')
topic = os.environ.get('KAFKA_TOPIC')
subject = '${}-value'.format(topic)
sr = CachedSchemaRegistryClient(schemaRegistry)
schema = sr.get_latest_schema(subject).schema
value_schema = avro.loads(str(schema))
url = 'test.com'
value = {'url': u'test.com', 'priority': 10}
avroProducer = AvroProducer({
'bootstrap.servers': kafkaBrokers,
'schema.registry.url': schemaRegistry
}, default_value_schema=value_schema)
key = 1638895406382020875
avroProducer.produce(topic=topic, value=value, key=key)
avroProducer.flush()
我收到以下错误:
raise KeySerializerError("Avro schema required for key")
confluent_kafka.avro.serializer.KeySerializerError: Avro schema required for key
如果我从生产函数中删除密钥:
avroProducer.produce(topic=topic, value=value)
有效。
如何在没有架构的情况下发送密钥?
【问题讨论】:
标签: python apache-kafka avro kafka-producer-api