【问题标题】:Confluent Kafka Python library configure producer for bulk msgConfluent Kafka Python 库为批量消息配置生产者
【发布时间】:2023-03-08 08:56:01
【问题描述】:

我需要设置 Kafka 生产者在一批中发送 500 条消息,而不是通过消息发送消息,而是批量导入。我检查了https://github.com/dpkp/kafka-python/issues/479 并尝试了producer.send_messages(topic, *message) 但它失败并出现错误:

>       producer.send_messages('event_connector_mt', *load_entries)
E       AttributeError: 'cimpl.Producer' object has no attribute 'send_messages'

我也尝试过像 producer.produce(topic, *message) 失败:

       producer.produce('event_connector_mt', *load_entries)
E       TypeError: function takes at most 8 arguments (501 given)

所以我挖掘了更多,发现我必须在生产者配置中将类型设置为 async 和 batch.size 大于默认值,但是当我尝试像这样的配置时:

from confluent_kafka import Consumer, Producer       

producer = Producer(**{'bootstrap.servers': KAFKA_BROKERS,
                       'queue.buffering.max.messages': 1000000,
                       'batch.num.messages': 500,
                       'batch.size': 19999,
                       'producer.type': 'async'
                       })

失败:

E       KafkaException: KafkaError{code=_INVALID_ARG,val=-186,str="No such configuration property: "producer.type""}

batch.size 的相同错误 你能告诉我我在哪里以及如何设置异步和批量大小或任何其他方式将批量消息传递给 Kafka 0.9.3.1

【问题讨论】:

    标签: python apache-kafka kafka-producer-api confluent-platform


    【解决方案1】:

    默认情况下,所有生产者都是异步的。底层 librdkafka 库不支持 Producer.type 和 batch.size 配置。

    因此,请使用可用的配置 batch.num.messages 或 message.max.bytes。

    【讨论】:

      【解决方案2】:

      您正在混淆名为“confluent-kafka-python”的 Confluent python 客户端

      http://docs.confluent.io/3.2.1/clients/confluent-kafka-python/

      使用“kafka-python”客户端

      https://github.com/dpkp/kafka-python

      这是两个具有不同 API 的不同客户端。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 2020-04-26
        • 1970-01-01
        • 2021-04-24
        • 2021-04-19
        • 1970-01-01
        • 2016-11-10
        • 2019-04-30
        相关资源
        最近更新 更多