【问题标题】:Kafka consumer does not fetch new records when using topic pattern and large messagesKafka 消费者在使用主题模式和大消息时不会获取新记录
【发布时间】:2021-01-18 10:45:28
【问题描述】:

我希望你们中的某个人可以帮助我。

我正在使用 spring boot 2.3.4spring kafka 2.5.6。我最近不得不重置偏移量并看到一些奇怪的行为。我们消费了这些消息,但在每 X 条(变化的)消息之后,我们在消费继续之前有 10 秒的超时。

这是我的配置:

spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      enable-auto-commit: false
      auto-offset-reset: earliest
      heartbeat-interval: 1000
      max-poll-records: 50
      group-id: kafka-fetch-demo
      fetch-max-wait: 10000
    listener:
      type: single
      concurrency: 1
      poll-timeout: 1000
      no-poll-threshold: 2
      monitor-interval: 10
      ack-mode: manual
    producer: 
      acks: all
      batch-size: 0
      retries: 0

这是一个示例监听器代码:

  @KafkaListener(id = LISTENER_ID, idIsGroup = false, topicPattern = "#{demoProperties.getTopicPattern()}")
  public void onEvent(Acknowledgment acknowledgment, ConsumerRecord<byte[], String> record) {
    log.info("Received record on topic {}, partition {} and offset {}",
            record.topic(),
            record.partition(),
            record.offset());

    acknowledgment.acknowledge();
  }

分析

我发现 10 秒超时来自 fetch.max.wait.ms 属性。但是我无法弄清楚为什么这个属性适用。

据我了解,fetch-max-wait 属性仅确定代理在向消费者提供新记录之前等待的最长时间,即使未超过 fetch.min.bytes。 (在我的情况下设置为默认值 1 并且应始终填写) 此外,我分析了这个问题仅适用于使用主题模式和“更大”消息时。

复制

我在 Github 上上传了一个演示应用程序来重现该问题:https://github.com/kraennix/kafka-fetch-demo

我是如何复制它的:

  1. 我在一个 kafka 主题上放置了一千条消息,每条消息 17.1 KB。
  2. 我启动了我的消费应用程序,该应用程序按主题模式侦听该主题。然后你可以看到这种停止行为。

注意:如果我对“小”消息(89 字节)执行相同操作,它会按预期工作。

日志

在日志中,您可以看到成功提交,但随后显示 Skipping fetch

2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] essageListenerContainer$ListenerConsumer : Commit list: {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}}
2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] essageListenerContainer$ListenerConsumer : Committing: {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}}
2021-01-16 15:04:40.773 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Sending OffsetCommit request with {publish.LargeTopic.2.test-0=OffsetAndMetadata{offset=488, leaderEpoch=null, metadata=''}} to coordinator localhost:9092 (id: 2147483647 rack: null)
2021-01-16 15:04:40.773 DEBUG 19244 --- [_LISTENER-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Using older server API v7 to send OFFSET_COMMIT {group_id=kafka-fetch-demo,generation_id=4,member_id=consumer-kafka-fetch-demo-1-cf8e747f-531d-457a-aca8-18960c518ef9,group_instance_id=null,topics=[{name=publish.LargeTopic.2.test,partitions=[{partition_index=0,committed_offset=488,committed_leader_epoch=-1,committed_metadata=}]}]} with correlation id 62 to node 2147483647
2021-01-16 15:04:40.778 TRACE 19244 --- [_LISTENER-0-C-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Completed receive from node 2147483647 for OFFSET_COMMIT with correlation id 62, received {throttle_time_ms=0,topics=[{name=publish.LargeTopic.2.test,partitions=[{partition_index=0,error_code=0}]}]}
2021-01-16 15:04:40.779 DEBUG 19244 --- [_LISTENER-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Committed offset 488 for partition publish.LargeTopic.2.test-0
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.1.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.2.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.1.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed
2021-01-16 15:04:40.779 TRACE 19244 --- [_LISTENER-0-C-1] o.a.k.c.consumer.internals.Fetcher       : [Consumer clientId=consumer-kafka-fetch-demo-1, groupId=kafka-fetch-demo] Skipping fetch for partition publish.LargeTopic.2.test-0 because previous request to localhost:9092 (id: 0 rack: null) has not been processed

【问题讨论】:

    标签: spring spring-boot apache-kafka kafka-consumer-api spring-kafka


    【解决方案1】:

    当消息的大小发生变化时,您可能需要更改以下 2 个 Props 心跳间隔:1000 最大轮询记录:50

    您的心跳间隔为 1 秒,最长轮询等待时间为 10 秒。如果消息的大小很大,并且您正在同一个线程中处理消费的消息,那么在下一次 Pull 触发时,心跳检查将失败。确保 Executor 使用 Callable 处理消息。

    当消息大小较大时,将心跳间隔增加到 5 到 10 秒,并将 Max Poll 记录减少到 15。希望,这可以帮助

    【讨论】:

    • 感谢您的回答。 AFAIK 心跳始终在后台运行,而不是在轮询/处理线程中。此外,我不认为最大轮询记录或一般时间应该是一个问题,因为应用程序可以每毫秒处理多个记录。所以不应该有任何重叠的调用。我也看不到此属性与主题 / topicPattern 属性之间的相关性。 (注解属性topics 按预期工作)
    • 我同意 Heart beat 在后台运行。但是,这一切都归结为线程争用。如果你的心跳线程没有得到你的数据处理的时间片,那么显然会失败。
    猜你喜欢
    • 2021-07-09
    • 2019-06-25
    • 1970-01-01
    • 2018-07-27
    • 2020-12-12
    • 2021-01-26
    • 1970-01-01
    • 2022-01-20
    • 2017-12-07
    相关资源
    最近更新 更多