【发布时间】:2019-07-25 10:44:13
【问题描述】:
我有一个名为 a_topic_that_does_not_exist 的 Kafka 主题,我想使用 Confluent Kafka Python client 向该主题发布一条消息。
顾名思义,上面的主题不存在,所以我预计在尝试发布它时会出错,因为 Kafka 服务器配置文件中的auto.create.topics.enable=false。
这是我的sn-p代码:
from confluent_kafka.cimpl import Producer
print('Creating producer')
producer = Producer(
{
'bootstrap.servers': 'kafka.foo.com:9092',
}
)
print('Producing message 1')
producer.produce('a_topic_that_does_not_exist', b'A message 1')
print('Flushing 1')
producer.flush()
print('Producing message 2')
producer.produce('a_topic_that_does_not_exist', b'A message 2')
print('Flushing 2')
producer.flush()
这是输出:
Creating producer
Producing message 1
Flushing 1
Traceback (most recent call last):
Producing message 2
File "ISSUE.py", line 14, in <module>
producer.produce('a_topic_that_does_not_exist', b'A message 2')
cimpl.KafkaException: KafkaError{code=_UNKNOWN_TOPIC,val=-188,str="Unable to produce message: Local: Unknown topic"}
如您所见,第一次刷新成功返回,但下一次发布失败。 除此之外,我可以在第一次刷新之前生成任意数量的消息,而不会出现任何错误。
总而言之,感知到的行为似乎如下:
- 我为一个不存在的主题生成了一些消息
- 我冲洗
- 我为该主题生成另一条消息 -> 错误
最好在第 2 点而不是在第 3 点出现该错误。
更一般地说,我如何确定我生成的所有消息都已正确发布?
我目前的解决方案如下:
import time
from confluent_kafka.cimpl import Producer
print('Creating producer')
producer = Producer(
{
'bootstrap.servers': 'kafka.foo.com:9092',
}
)
print('Producing message 1')
errors = []
def delivery_report(err, msg):
time.sleep(0.1)
if err:
errors.append(err)
print('delivery_report({}, {})'.format(err, msg.value()))
for i in range(100):
producer.poll(0)
producer.produce('a_topic_that_does_not_exist', 'A message {}'.format(i), callback=delivery_report)
print('Flushing 1')
producer.flush()
if errors:
raise ValueError
print('Producing message 2')
producer.produce('a_topic_that_does_not_exist', b'A message FINAL')
print('Flushing 2')
producer.flush()
【问题讨论】:
标签: python python-3.x apache-kafka confluent-platform