【问题标题】:Kafka does not always crash on failing flushKafka 并不总是在刷新失败时崩溃
【发布时间】:2019-07-25 10:44:13
【问题描述】:

我有一个名为 a_topic_that_does_not_exist 的 Kafka 主题,我想使用 Confluent Kafka Python client 向该主题发布一条消息。

顾名思义,上面的主题不存在,所以我预计在尝试发布它时会出错,因为 Kafka 服务器配置文件中的auto.create.topics.enable=false

这是我的sn-p代码:

from confluent_kafka.cimpl import Producer

print('Creating producer')
producer = Producer(
            {
                'bootstrap.servers': 'kafka.foo.com:9092',
            }
        )
print('Producing message 1')
producer.produce('a_topic_that_does_not_exist', b'A message 1')
print('Flushing 1')
producer.flush()
print('Producing message 2')
producer.produce('a_topic_that_does_not_exist', b'A message 2')
print('Flushing 2')
producer.flush()

这是输出:

Creating producer
Producing message 1
Flushing 1
Traceback (most recent call last):
Producing message 2
  File "ISSUE.py", line 14, in <module>
    producer.produce('a_topic_that_does_not_exist', b'A message 2')
cimpl.KafkaException: KafkaError{code=_UNKNOWN_TOPIC,val=-188,str="Unable to produce message: Local: Unknown topic"}

如您所见,第一次刷新成功返回,但下一次发布失败。 除此之外,我可以在第一次刷新之前生成任意数量的消息,而不会出现任何错误。

总而言之,感知到的行为似乎如下:

  1. 我为一个不存在的主题生成了一些消息
  2. 我冲洗
  3. 我为该主题生成另一条消息 -> 错误

最好在第 2 点而不是在第 3 点出现该错误。

更一般地说,我如何确定我生成的所有消息都已正确发布?

我目前的解决方案如下:

import time

from confluent_kafka.cimpl import Producer

print('Creating producer')
producer = Producer(
            {
                'bootstrap.servers': 'kafka.foo.com:9092',
            }
        )
print('Producing message 1')


errors = []
def delivery_report(err, msg):
    time.sleep(0.1)
    if err:
        errors.append(err)
    print('delivery_report({}, {})'.format(err, msg.value()))


for i in range(100):
    producer.poll(0)
    producer.produce('a_topic_that_does_not_exist', 'A message {}'.format(i), callback=delivery_report)
print('Flushing 1')
producer.flush()
if errors:
    raise ValueError
print('Producing message 2')
producer.produce('a_topic_that_does_not_exist', b'A message FINAL')
print('Flushing 2')
producer.flush()

【问题讨论】:

    标签: python python-3.x apache-kafka confluent-platform


    【解决方案1】:

    我遇到了同样的问题。据我了解,问题出在 Kafka 配置上。在这种情况下,Kafka 不允许生产者创建新主题。 如果新主题不存在,可以将 Kafka 配置为允许生产者创建新主题。正如您所提到的,当 Kafka 实际尝试通过调用 flush() 输入消息时,就会出现问题。在这种情况下,最好在调用 producer.produce(new_topic, message)

    后引发异常“无法创建主题”

    【讨论】:

      猜你喜欢
      • 2015-06-13
      • 2015-08-17
      • 2021-12-31
      • 2022-07-09
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多