【问题标题】:Not able to poll / fetch all records from kafka topic无法从 kafka 主题轮询/获取所有记录
【发布时间】:2020-03-13 07:22:41
【问题描述】:

我正在尝试从特定主题轮询数据,例如 kafka 正在接收 100 条记录/秒 但大多数时候它不会获取所有记录。 我使用超时时间为 5000 毫秒,并且每隔 100ms 调用此方法 注意:我也在订阅特定主题

@Scheduled(fixedDelayString = "100")

    public void pollRecords() {
        ConsumerRecords<String, String> records = 
        leadConsumer.poll("5000");

如何从 kafka 获取所有数据?

【问题讨论】:

    标签: java apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    从 poll() 返回的最大记录数由 max.poll.records 消费者配置参数指定。 (默认为 500)此外,还有另一个消费者配置参数限制了从服务器返回的最大数据量。 (fetch.max.bytesmax.partition.fetch.bytes

    另一方面,在代理方面还有另一个大小限制,称为message.max.bytes

    所以你应该正确设置这些参数以获得更多的消息。

    来自 Kafka 文档 (link):

    ma​​x.poll.records:单次返回的最大记录数 调用 poll()。 (默认:500)

    fetch.max.bytes:服务器应为获取请求返回的最大数据量。记录由分批获取 消费者,并且如果第一个非空的第一个记录批次 fetch的partition大于这个值,记录batch 仍然会被退回,以确保消费者可以取得进展。 因此,这不是绝对最大值。最大记录批量大小 代理接受的定义是通过 message.max.bytes (代理 config)或 max.message.bytes(主题配置)。请注意,消费者 并行执行多个提取。 (默认:52428800)

    message.max.bytes: Kafka 允许的最大记录批量大小。如果增加并且有 0.10.2 岁以上的消费者,则 消费者的获取大小也必须增加,以便他们可以 获取这么大的记录批次。在最新的消息格式版本中, 为了提高效率,记录总是分组为批次。在以前 消息格式版本,未压缩的记录不分组 批次,此限制仅适用于其中的单个记录 case.This 可以使用主题级别 max.message.bytes 为每个主题设置 配置。 (默认:1000012)

    ma​​x.partition.fetch.bytes:服务器将返回的每个分区的最大数据量。分批获取记录 由消费者。如果第一个记录批次中的第一个非空 fetch的partition大于这个限制,batch仍然会 被退回以确保消费者可以取得进展。最大值 代理接受的记录批量大小通过以下方式定义 message.max.bytes(代理配置)或 max.message.bytes(主题配置)。 有关限制消费者请求大小的信息,请参阅 fetch.max.bytes。 (默认:1048576)

    【讨论】:

      猜你喜欢
      • 2016-06-13
      • 1970-01-01
      • 1970-01-01
      • 2017-10-18
      • 1970-01-01
      • 2019-07-04
      • 1970-01-01
      • 2021-09-03
      • 1970-01-01
      相关资源
      最近更新 更多