【问题标题】:Python confluent kafka raise exception on broker connection disconnectPython confluent kafka 在代理连接断开时引发异常
【发布时间】:2020-11-03 12:32:39
【问题描述】:

我正在使用 python 3.7 和 confluent-kafka。

以下是我用来轮询 kafka 服务器并读取消息的伪代码。

        while True:
            MSG = CONSUMER.poll(0.1)
            if MSG is None:
                CONSUMER.commit()
                print('No msg')
                continue
            if MSG.error():
                print("Consumer error: {}".format(MSG.error()))
                CONSUMER.commit()
                continue
            try:
                rawMsg = format(MSG.value().decode('utf-8'))
                testmsg = json.loads(rawMsg)
            except:
                print('invalid json format msg')
                CONSUMER.commit()

如果 kafka 服务器由于某种原因关闭或断开连接,我希望抛出异常。

目前,如果发生上述情况,while 循环会继续运行而不会出现任何错误并打印 No msg

如何获得异常或检查每次循环中是否可以连接kafka服务器(如果要进行一些检查,它应该是轻量级的)

【问题讨论】:

标签: apache-kafka python-3.7 confluent-kafka-python


【解决方案1】:

创建消费者时,您可以在反序列化器中指定a callback for error

这是一个在生产者中使用相同机制的示例:

import confluent_kafka
def error_callback(err):
    print("callback hit!")
    raise(err)
p = confluent_kafka.Producer({
    "bootstrap.servers": "localhost:9092",
    "message.max.bytes": 10_000_000,
    "error_cb": error_callback,
    "debug": "msg",
})
p.produce("test-topic", "a" * int(2e6))
p.flush()

From github issues。这可能会有所帮助,但并不能解决问题。

【讨论】:

    猜你喜欢
    • 2020-07-18
    • 1970-01-01
    • 1970-01-01
    • 2017-06-01
    • 2021-04-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-08
    相关资源
    最近更新 更多