【发布时间】:2018-05-08 14:02:41
【问题描述】:
我不知道我是否遗漏了一些明显的东西,或者spring-integration-kafka:3.0.1 中存在试图让多个消费者为一个主题运行的错误。该场景是具有 10 个分区的单个 Kafka 主题,以及一个监听它的 springboot-app。相关配置为:
application.yml:
spring:
kafka:
consumer:
group-id: test-consumer
auto-offset-reset: earliest
listener:
concurrency: 4
配置:
@Configuration
@EnableIntegration
@IntegrationComponentScan("com.test")
public class MessageConfig {
@Bean
public MessageChannel testReceiveChannel() {
return MessageChannels.direct().get();
}
@Bean
public IntegrationFlow testReceiveFlow(@Qualifier("kafkaConsumerFactory") final ConsumerFactory<?, ?> kafkaConsumer, final MessageChannel testReceiveChannel) {
return IntegrationFlows
.from(Kafka.messageDrivenChannelAdapter(kafkaConsumer, ListenerMode.record, "test-topic"))
.transform(new JsonToObjectTransformer(EventMessage.class))
.channel(testReceiveChannel)
.get();
}
}
听众:
@Component
public class EventListener {
private static final Logger LOG = LoggerFactory.getLogger(EventListener.class);
@ServiceActivator(inputChannel = "testReceiveChannel")
public void processMessage(final EventMessage message) {
LOG.info("Got message {} on {}", message.getValue(), Thread.currentThread().getName());
}
}
开始时,我只有 1 个容器监听所有 10 个分区。我可以看到ConcurrentKafkaListenerContainerFactory 上设置了正确的并发值,但似乎从未调用过initializeContainer 方法(如果我理解正确,它将应用于实际消费者)。然而,我可能正在看完全错误的东西。
知道我忽略了什么吗?
【问题讨论】:
标签: java spring-integration spring-kafka