【问题标题】:Spring Kafka concurrency with spring-integrationSpring Kafka 并发与 spring-integration
【发布时间】:2018-05-08 14:02:41
【问题描述】:

我不知道我是否遗漏了一些明显的东西,或者spring-integration-kafka:3.0.1 中存在试图让多个消费者为一个主题运行的错误。该场景是具有 10 个分区的单个 Kafka 主题,以及一个监听它的 springboot-app。相关配置为:

application.yml:

spring:
  kafka:
    consumer:
      group-id: test-consumer
      auto-offset-reset: earliest
    listener:
      concurrency: 4

配置:

@Configuration
@EnableIntegration
@IntegrationComponentScan("com.test")
public class MessageConfig {
    @Bean
    public MessageChannel testReceiveChannel() {
        return MessageChannels.direct().get();
    }

    @Bean
    public IntegrationFlow testReceiveFlow(@Qualifier("kafkaConsumerFactory") final ConsumerFactory<?, ?> kafkaConsumer, final MessageChannel testReceiveChannel) {
        return IntegrationFlows
                .from(Kafka.messageDrivenChannelAdapter(kafkaConsumer, ListenerMode.record, "test-topic"))
                .transform(new JsonToObjectTransformer(EventMessage.class))
                .channel(testReceiveChannel)
                .get();
    }
}

听众:

@Component
public class EventListener {
    private static final Logger LOG = LoggerFactory.getLogger(EventListener.class);

    @ServiceActivator(inputChannel = "testReceiveChannel")
    public void processMessage(final EventMessage message) {
        LOG.info("Got message {} on {}", message.getValue(), Thread.currentThread().getName());
    }
}

开始时,我只有 1 个容器监听所有 10 个分区。我可以看到ConcurrentKafkaListenerContainerFactory 上设置了正确的并发值,但似乎从未调用过initializeContainer 方法(如果我理解正确,它将应用于实际消费者)。然而,我可能正在看完全错误的东西。

知道我忽略了什么吗?

【问题讨论】:

    标签: java spring-integration spring-kafka


    【解决方案1】:

    Spring Boot KafkaProperties(例如 spring.kafka.listener.concurrency = 4)和提到的 ConcurrentKafkaListenerContainerFactory 应用于 @KafkaListener 组件。根本与 Spring Integration 无关。至少是自动的。

    您需要手动完成:

    Kafka.messageDrivenChannelAdapter(kafkaConsumer, ListenerMode.record, "test-topic")
          .configureListenerContainer(c ->
                            c.concurrency(this.kafkaProperties.getListener().getConcurrency()))
    

    【讨论】:

    • 你知道是否有任何其他的监听器属性应该以同样的方式显式设置?
    • 更多属性见KafkaProperties.Listener类,人口见KafkaMessageListenerContainerSpec
    • 抱歉,最后一个问题 - 某些属性(例如 no-poll-threshold 或 monitor-interval)无法使用此方法设置。有没有替代方案?
    • noPollThresholdmonitorInterval 必须转到 ContainerProperties。查看重载的Kafka.messageDrivenChannelAdapter(ConsumerFactory&lt;K, V&gt; consumerFactory, ContainerProperties containerProperties, KafkaMessageDrivenChannelAdapter.ListenerMode listenerMode)
    • 因此您的"test-topic" 也必须转到ContainerProperties
    猜你喜欢
    • 2020-07-25
    • 2017-04-29
    • 2017-07-30
    • 2018-08-28
    • 2017-04-28
    • 2015-11-28
    • 2013-01-16
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多