【问题标题】:Consumer does not receive messages after kafka producer/consumer restartkafka生产者/消费者重启后消费者收不到消息
【发布时间】:2017-09-12 12:59:34
【问题描述】:

我们有一个生产者、一个消费者和一个分区。消费者/生产者都是 Spring Boot 应用程序。消费者应用程序在我的本地机器上运行,而生产者与 kafka 和 zookeeper 在远程机器上运行。

在开发过程中,我重新部署了我的生产者应用程序并进行了一些更改。但在那之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有运气。可能是什么问题和/或如何解决?

消费者配置:

spring:
  cloud:
    stream:
      defaultBinder: kafka
      bindings:
        input:
          destination: sales
          content-type: application/json
      kafka:
        binder:
          brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
          zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
          defaultZkPort: 2181
          defaultBrokerPort: 9092
server:
  port: 0

生产者配置

cloud:
stream:
  defaultBinder: kafka
  bindings:
    output:
      destination: sales
      content-type: application/json
  kafka:
    binder:
      brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
      zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
      defaultZkPort: 2181
      defaultBrokerPort: 9092

EDIT2

消费者应用程序在 5 分钟后终止,但出现以下异常:

2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2017-09-12 18:14:47,255  WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-09-12 18:14:47,256  INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input
2017-09-12 18:14:47,257  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel
2017-09-12 18:14:47,258  INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel

【问题讨论】:

  • 听起来很简单的场景。您介意在 GitHub 上的某个地方共享该应用程序,以便我们能够在本地重现问题吗?
  • @ArtemBilan 很抱歉,我不能分享我的代码。您需要哪些细节来提出解决方案?
  • 没有代码我没有想法。也许您可以为消费者和生产者共享配置?是的,我知道你不能共享整个应用程序,但至少可以为我们提供一些简单的 Spring Boot 应用程序......
  • @ArtemBilan 添加了配置。
  • 好。谢谢!所以,好吧。 Kafka Binder 版本是什么? “重新部署”是什么意思?如何在本地做到这一点? SCSt 应用程序是一个微服务。我很困惑。

标签: java apache-kafka spring-cloud-stream spring-kafka


【解决方案1】:

看看上面关于 DEBUG 的建议是否揭示了任何进一步的信息。看起来您从 KafkaTopicProvisioner 获得了一些超时异常。但是,当您重新启动我假设的消费者时,就会发生这种情况。看起来消费者以某种方式与经纪人沟通时遇到了一些麻烦,您需要找出那里发生了什么。

【讨论】:

    【解决方案2】:

    好吧,看起来已经有一个bug 报告了spring-cloud-stream-binder-kafka,说明resetOffset 属性无效。因此,消费者总是请求偏移量为latest的消息。

    如 git 问题所述,唯一的解决方法是通过 kafka 消费者 CLI 工具解决此问题。

    【讨论】:

      猜你喜欢
      • 2019-07-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-01-21
      • 1970-01-01
      • 2020-10-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多