【发布时间】:2021-03-17 03:23:51
【问题描述】:
我正在尝试使用 spring cloud Kafka 流 binder(2.4.3) 来消费和生成 Avro 消息。我可以使用单个函数来使用消息并生成记录,但是我正在寻找一个可以在应用程序中多次使用的生产者。 StreamBridge 似乎是一个选项,并尝试了以下方法,但它不起作用,我错过了什么
@Autowired
StreamBridge streamBridge;
@Bean
public Consumer<KStream<EventKey, Event>> process(){
return input -> {
input.peek((k,v) -> sendToValue(v)
);
};
}
private Event sendToValue(Event event){
System.out.println(Event);
streamBridge.send("process-out-0",event);
System.out.println("Message sent");
return touchpointEvent;
}
活页夹:
spring:
application:
name: ${applicaton-name}
cloud:
stream:
function:
definition: process
bindings:
process-in-0:
destination: ${input-topic-name}
contentType: application/Avro
process-out-0:
destination: ${enriched-topic-name}
contentType: application/Avro
binding-retry-interval: 30
kafka:
streams:
binder:
brokers: ${kafka-broker}
application-id: ${consumer-group-name}
auto-create-topics: false
auto-add-partitions: false
configuration:
processing.guarantee: at_least_once
auto.offset.reset: earliest
schema.registry.url: ${kafka-schema-registry}
auto-register-schema: false
security.protocol: SSL
useNativeEncoding: true
specific.avro.reader: true
错误
[2021-03-16 21:51:52,219] [INFO] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.c.s.b.DefaultBinderFactory DefaultBinderFactory.java:243] Creating binder: ktable
[2021-03-16 21:51:52,312] [INFO] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.c.s.b.DefaultBinderFactory DefaultBinderFactory.java:343] Caching the binder: ktable
[2021-03-16 21:51:52,313] [INFO] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.c.s.b.DefaultBinderFactory DefaultBinderFactory.java:347] Retrieving cached binder: ktable
[2021-03-16 21:51:52,313] [INFO] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.c.s.b.DefaultBinderFactory DefaultBinderFactory.java:243] Creating binder: kafka
[2021-03-16 21:51:52,372] [WARN] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.c.a.AnnotationConfigApplicationContext AbstractApplicationContext.java:559] Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaMessageChannelBinder' defined in org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration: Unexpected exception during bean creation; nested exception is java.lang.NoClassDefFoundError: org/springframework/integration/support/management/ManageableLifecycle
[2021-03-16 21:51:52,374] [INFO] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.b.a.l.ConditionEvaluationReportLoggingListener ConditionEvaluationReportLoggingListener.java:136]
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
[2021-03-16 21:51:52,379] [ERROR] [latest-c8b6dd0c-b376-4cdd-a72d-17e97701d1d5-StreamThread-1] [o.s.b.SpringApplication SpringApplication.java:837] Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'kafkaMessageChannelBinder' defined in org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration: Unexpected exception during bean creation; nested exception is java.lang.NoClassDefFoundError: org/springframework/integration/support/management/ManageableLifecycle
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:529)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:324)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:234)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:322)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:202)
随版本更改更新:
[2021-03-17 09:56:28,043] [INFO] [latest-0ea7c0a5-21a4-464b-bf42-a5c35b6687dc-StreamThread-1] [o.a.k.c.u.AppInfoParser AppInfoParser.java:117] Kafka version: 6.0.1-ccs
[2021-03-17 09:56:28,043] [INFO] [latest-0ea7c0a5-21a4-464b-bf42-a5c35b6687dc-StreamThread-1] [o.a.k.c.u.AppInfoParser AppInfoParser.java:118] Kafka commitId: 9c1fbb3db1e0d69d
[2021-03-17 09:56:28,043] [INFO] [latest-0ea7c0a5-21a4-464b-bf42-a5c35b6687dc-StreamThread-1] [o.a.k.c.u.AppInfoParser AppInfoParser.java:119] Kafka startTimeMs: 1615992988043
[2021-03-17 09:56:28,047] [WARN] [kafka-admin-client-thread | adminclient-2] [o.a.k.c.NetworkClient NetworkClient.java:757] [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[2021-03-17 09:56:28,151] [WARN] [kafka-admin-client-thread | adminclient-2] [o.a.k.c.NetworkClient NetworkClient.java:757] [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[2021-03-17 09:56:28,357] [WARN] [kafka-admin-client-thread | adminclient-2] [o.a.k.c.NetworkClient NetworkClient.java:757] [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[2021-03-17 09:56:28,568] [WARN] [kafka-admin-client-thread | adminclient-2] [o.a.k.c.NetworkClient NetworkClient.java:757] [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[2021-03-17 09:56:28,987] [WARN] [kafka-admin-client-thread | adminclient-2] [o.a.k.c.NetworkClient NetworkClient.java:757] [AdminClient clientId=adminclient-2] Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
【问题讨论】:
-
看起来有些依赖问题。我怀疑2.4.3,你的意思是引导版本。什么版本的活页夹?你能在maven pom中显示依赖关系吗?
-
@sobychacko Yes 似乎是错误的依赖问题。但是在将活页夹版本升级到 3.1.1 之后。生产者正在尝试连接到本地主机而不是活页夹配置代理。用新日志更新了问题
标签: apache-kafka-streams spring-kafka spring-cloud-stream kafka-producer-api