【问题标题】:Reading messages in bulk through a Pulsar consumer通过 Pulsar 消费者批量读取消息
【发布时间】:2020-09-22 03:54:11
【问题描述】:

我正在使用节点 pulsar 客户端来使用来自 Pulsar 主题的消息。消费者使用shared 订阅模式订阅主题。目前,对receive 的每次调用都会从主题中获取一条消息。有没有办法批量接收消息?

【问题讨论】:

    标签: apache-pulsar


    【解决方案1】:

    您收到一条一条消息的事实并不意味着 Pulsar 客户端没有在后台使用批处理和其他优化技术。 Pulsar Java 消费者的Official documentation 定义了定义消息累积的receiverQueueSize 参数。默认情况下,Pulsar 消费者对其参数使用合理的值,并且对于大多数应用程序来说它应该表现得相当好。您是否遇到过任何问题或性能下降?

    更新

    从 Apache Pulsar 2.4.1 版本开始,可以使用 consumer 批量接收消息。首先,应使用 BatchReceivePolicy 配置创建消费者(将值更改为更适合您的用例):

    Consumer<GenericRecord> consumer = pulsarClient
            .newConsumer(Schema.AUTO_CONSUME())
            .batchReceivePolicy(BatchReceivePolicy.builder()
                .maxNumMessages(5000)
                .maxNumBytes(10 * 1024 * 1024) 
                .timeout(1, TimeUnit.SECONDS).build())
    // .. other configuration such as topic and subscription
    

    其次,使用batchReceive方法获取一批消息:

    Messages<GenericRecord> messages = consumer.batchReceive();
    

    处理完所有消息后,只需确认所有消息:

    consumer.acknowledge(messages);
    

    【讨论】:

    • 我们在golang中是否有类似batchReceive的东西?
    • AFAIK,golang客户端没有实现
    猜你喜欢
    • 2021-04-24
    • 2022-11-02
    • 1970-01-01
    • 2016-12-15
    • 2021-12-02
    • 2017-11-09
    • 1970-01-01
    • 2022-07-14
    • 2016-06-14
    相关资源
    最近更新 更多