【问题标题】:Spring Integration: Reading from Kafka QueueSpring 集成:从 Kafka 队列中读取
【发布时间】:2018-07-31 22:15:23
【问题描述】:

在 Spring Boot 应用程序中,我想使用 Spring Integration 从 Kafka 队列中读取数据。配置如下:

@Bean
public KafkaMessageDrivenChannelAdapter<String, String>
adapter(KafkaMessageListenerContainer<String, String> container) {
    KafkaMessageDrivenChannelAdapter<String, String> kafkaMessageDrivenChannelAdapter =
            new KafkaMessageDrivenChannelAdapter<>(container);
    kafkaMessageDrivenChannelAdapter.setOutputChannel(receiver());
    return kafkaMessageDrivenChannelAdapter;
}

@Bean
public KafkaMessageListenerContainer<String, String> container() throws Exception {
    ContainerProperties properties = new ContainerProperties(this.topic);
    return new KafkaMessageListenerContainer<>(consumerFactory(), properties);
}

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = ... // set properties
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public DirectChannel receiver() {
    return new DirectChannel();
}

@Autowired
private Resolver resolver;

@Bean
public EventDrivenConsumer getEventDrivenConsumer() {
    return new EventDrivenConsumer(receiver(), resolver);
}

Resolver bean 实现了MessageHandler

消息在队列中被接收,但不被解析器 bean 处理。

Spring Boot应用注解如下:

@SpringBootApplication(exclude = KafkaAutoConfiguration.class)

所以应该没有 Kafka bean 的自动配置。

以下是错误:

java.lang.NullPointerException: null
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:188) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:72) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:47) ~[spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:792) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:736) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access$2100(KafkaMessageListenerContainer.java:246) [spring-kafka-1.1.7.RELEASE.jar:na]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1025) [spring-kafka-1.1.7.RELEASE.jar:na]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_20]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_20]
    at java.lang.Thread.run(Thread.java:745) [na:1.8.0_20]

调试,很明显RecordMessagingMessageListenerAdapter(堆栈跟踪的顶部)中,this.methodHandler 为空。

Spring Integration 中将通道连接到应该处理消息的 bean 的正确方法是什么?

【问题讨论】:

  • 看起来您的应用程序中还有其他内容未显示在此配置中。根据您的堆栈跟踪,与KafkaMessageDrivenChannelAdapter 没有任何交互。那个注入一个IntegrationRecordMessageListener 侦听器,而不是上面提到的RecordMessagingMessageListenerAdapter。也许你可以在 GitHub 上的某个地方与我们分享这个 Spring Boot 项目?
  • 阿尔乔姆,谢谢。已更新问题以指出我正在使用 @SpringBootApplication(exclude = KafkaAutoConfiguration.class) 来明确排除 Kafka 的自动配置。
  • 谢谢。这仍然无济于事。也许在 NPE 堆栈跟踪的日志中还有其他内容?您还可以打开org.springframework.kafka 的 DEBUG 日志记录级别以跟踪调用的内容和方式。也许你会弄清楚自己是谁以及如何在没有handlerMethod 的情况下调用某个侦听器容器。可能你有@EnableKafka 和其他什么?
  • 阿尔乔姆,谢谢。没有@EnableKafka。通过调试,我看到:s.i.k.i.KafkaMessageDrivenChannelAdapter:启动适配器,在 o.s.k.l.KafkaMessageListenerContainer 之后不久:分区撤销:[]。我知道 KafkaMessageListenerContainer 不应该被实例化,对吗?
  • 不,它必须是因为它是一个 bean container() 并且它是 KafkaMessageDrivenChannelAdapter 所必需的。

标签: java spring-integration spring-kafka


【解决方案1】:

这是决议:

项目的父声明如下:

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.10.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

对于以下依赖:

<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-kafka</artifactId>
    <version>${spring-integration-kafka.version}</version>
</dependency>

以前使用过,用于spring-integration-kafka.version:

3.0.1.RELEASE

改为:

2.1.0.RELEASE

一切正常。

但是,如果没有明确的 spring-integration-kafka 版本,项目将无法构建,因为缺少类。

Boot 的优点之一是它可以处理依赖版本。也许应该有一个 spring-boot-integration-kafka 依赖项,它可以防止这个问题。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2023-03-31
    • 2022-01-04
    • 2021-05-23
    • 2019-07-06
    • 1970-01-01
    • 2016-07-09
    • 2016-06-02
    • 2016-09-14
    相关资源
    最近更新 更多