【发布时间】:2021-06-08 00:56:03
【问题描述】:
我有一个用例,我将在同一主题中接收两种对象。我面临的问题是所有的对象都去同一个听众。第二个监听器没有被使用。
我的听众类似于下面
@EnableKafka
@Service
public class consumerService {
@KafkaListener(topic = "t", groupId = "g" containerFactory = "fooContainerFactory")
public void consume1(Foo foo) {
logger.info("I got : " + foo);
}
@KafkaListener(topic = "t", groupId = "g" containerFactory = "barContainerFactory")
public void consume2(Bar bar) {
logger.info("I got : " + bar);
}
}
Foo 的容器工厂如下,Bar 也有类似的。
@Configuration
public class MyConsumerConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
@Value(value = "${group.id}")
private String groupId;
public Map<String, Object> clickConsumerFactory() {
/** Set the consumer properties here */
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
return props;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Foo> clickKafkaListenerContainerFactory() {
/** Listener factory for the class 'Foo' */
ConcurrentKafkaListenerContainerFactory<String, Foo> factory
= new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(clickConsumerFactory()));
factory.setMessageConverter(new StringJsonMessageConverter());
factory.setErrorHandler(((exception, data) -> LOGGER.info("There was an error " + exception)));
return factory;
}
}
我尝试使用 KafkaHandler,但我不知道如何指定单个容器工厂(对于 Foo:FooContainerFactory,对于 Bar:BarContainerFactory) 我被困了几个小时。谁能告诉我应该改变什么?
【问题讨论】:
-
如果您的主题有单个分区,您的消费者之一将处于空闲状态。因为它们属于同一个消费者组(同一个 groupId)。
标签: java spring-boot apache-kafka kafka-consumer-api spring-kafka