【问题标题】:Batch Size Problem with MapR Streams Kafka APIMapR Streams Kafka API 的批量大小问题
【发布时间】:2020-01-22 11:28:21
【问题描述】:

您好,我正在使用 Kafka MapRStream 接收来自 Mapr Streams 主题的事件。

我正在尝试增加我的消费者的批量大小,但我在一批中收到的消息不超过30条!! p>

单个事件的大小约为 5000 字节。如果事件较小,我会在一批中获得更多。

这是我对 Consumer 的配置:

public static void main( String[] args ) {
        final Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "batchSize");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 50000);
        props.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 26214400);
        props.put(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, 100 * 1024 * 1024);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 1000);


        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Collections.singletonList(TOPIC));
        long totalCount = 0;
        long start = System.currentTimeMillis();
        long countTimesNoMessages = 0;

        while (countTimesNoMessages < 10) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            totalCount += records.count();
            System.out.println(records.count());
            if (records.count() == 0) {
                countTimesNoMessages++;
            }
        }

        long end = System.currentTimeMillis();
        System.out.println((end - start) + " for " + totalCount + " messages");
    } 

【问题讨论】:

  • 每秒有多少条消息进入主题?
  • 主题满是消息,大约每秒500条,已经包含100万条

标签: java kafka-consumer-api mapr batchsize


【解决方案1】:

这些是可能的配置点。

https://mapr.com/docs/61/MapR_Streams/configuration-parameters.html

注意fetch.max.bytes 是总的最大值,所有分区上的sum(max.partition.fetch.bytes) 不能超过 fetch.max.bytes。

调整max.partition.fetch.bytes 是正常的,因此每个分区轮询超过64Kb(默认),并且还将调整fetch.max.bytes 以便它允许max.partition.fetch.bytes 正常工作。

您不应该将批量大小设置得太大。一旦轮询流的请求频率下降到每秒几百次左右,您就不太可能获得额外的性能改进,并且更有可能在线程失败的情况下遇到热点问题或大量重做工作.

【讨论】:

  • 默认情况下 fetch.max.bytes 已经是 (50*1024*1024) 50mb
  • 那么分区限制是多少?
  • 现在我将它设置为 3mb 但没有任何改变。我有 12 个分区和一个消费者。
  • 请用一个小的演示程序复制这个并公开发布代码。如果没有代码的详细信息,就无法调试它。
猜你喜欢
  • 1970-01-01
  • 2018-03-04
  • 1970-01-01
  • 2020-07-12
  • 2020-04-28
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-02-20
相关资源
最近更新 更多