【发布时间】:2017-09-12 12:59:34
【问题描述】:
我们有一个生产者、一个消费者和一个分区。消费者/生产者都是 Spring Boot 应用程序。消费者应用程序在我的本地机器上运行,而生产者与 kafka 和 zookeeper 在远程机器上运行。
在开发过程中,我重新部署了我的生产者应用程序并进行了一些更改。但在那之后,我的消费者没有收到任何消息。我尝试重新启动消费者,但没有运气。可能是什么问题和/或如何解决?
消费者配置:
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
input:
destination: sales
content-type: application/json
kafka:
binder:
brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
defaultZkPort: 2181
defaultBrokerPort: 9092
server:
port: 0
生产者配置:
cloud:
stream:
defaultBinder: kafka
bindings:
output:
destination: sales
content-type: application/json
kafka:
binder:
brokers: ${SERVICE_REGISTRY_HOST:127.0.0.1}
zkNodes: ${SERVICE_REGISTRY_HOST:127.0.0.1}
defaultZkPort: 2181
defaultBrokerPort: 9092
EDIT2:
消费者应用程序在 5 分钟后终止,但出现以下异常:
2017-09-12 18:14:47,254 ERROR main o.s.c.s.b.k.p.KafkaTopicProvisioner:253 - Cannot initialize Binder
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
2017-09-12 18:14:47,255 WARN main o.s.b.c.e.AnnotationConfigEmbeddedWebApplicationContext:550 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'inputBindingLifecycle'; nested exception is org.springframework.cloud.stream.binder.BinderException: Cannot initialize binder:
2017-09-12 18:14:47,256 INFO main o.s.i.m.IntegrationMBeanExporter:449 - Unregistering JMX-exposed beans on shutdown
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:241 - Unregistering JMX-exposed beans
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: input
2017-09-12 18:14:47,257 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: nullChannel
2017-09-12 18:14:47,258 INFO main o.s.i.m.IntegrationMBeanExporter:375 - Summary on shutdown: errorChannel
【问题讨论】:
-
听起来很简单的场景。您介意在 GitHub 上的某个地方共享该应用程序,以便我们能够在本地重现问题吗?
-
@ArtemBilan 很抱歉,我不能分享我的代码。您需要哪些细节来提出解决方案?
-
没有代码我没有想法。也许您可以为消费者和生产者共享配置?是的,我知道你不能共享整个应用程序,但至少可以为我们提供一些简单的 Spring Boot 应用程序......
-
@ArtemBilan 添加了配置。
-
好。谢谢!所以,好吧。 Kafka Binder 版本是什么? “重新部署”是什么意思?如何在本地做到这一点? SCSt 应用程序是一个微服务。我很困惑。
标签: java apache-kafka spring-cloud-stream spring-kafka