【问题标题】:Listen to multiple type of objects in the same Kafka topic在同一个 Kafka 主题中监听多种类型的对象
【发布时间】:2021-06-08 00:56:03
【问题描述】:

我有一个用例,我将在同一主题中接收两种对象。我面临的问题是所有的对象都去同一个听众。第二个监听器没有被使用。

我的听众类似于下面

@EnableKafka
@Service
public class consumerService {

  @KafkaListener(topic = "t", groupId = "g" containerFactory = "fooContainerFactory")
  public void consume1(Foo foo) {
     logger.info("I got : " + foo);
  }

  @KafkaListener(topic = "t", groupId = "g" containerFactory = "barContainerFactory")
  public void consume2(Bar bar) {
     logger.info("I got : " + bar);
  }

}

Foo 的容器工厂如下,Bar 也有类似的。

@Configuration
public class MyConsumerConfig {


  @Value(value = "${kafka.bootstrapAddress}")
  private String bootstrapAddress;

  @Value(value = "${group.id}")
  private String groupId;

  public Map<String, Object> clickConsumerFactory() {
    /** Set the consumer properties here */
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    return props;
  }

  @Bean
  public ConcurrentKafkaListenerContainerFactory<String, Foo> clickKafkaListenerContainerFactory() {
    /** Listener factory for the class 'Foo' */
    ConcurrentKafkaListenerContainerFactory<String, Foo> factory
            = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(clickConsumerFactory()));
    factory.setMessageConverter(new StringJsonMessageConverter());
    factory.setErrorHandler(((exception, data) -> LOGGER.info("There was an error " + exception)));
    return factory;
  }
}

我尝试使用 KafkaHandler,但我不知道如何指定单个容器工厂(对于 Foo:FooContainerFactory,对于 Bar:BarContainerFactory) 我被困了几个小时。谁能告诉我应该改变什么?

【问题讨论】:

  • 如果您的主题有单个分区,您的消费者之一将处于空闲状态。因为它们属于同一个消费者组(同一个 groupId)。

标签: java spring-boot apache-kafka kafka-consumer-api spring-kafka


【解决方案1】:

你不能那样做。如果同一个主题同时包含 Foo 和 Bar 对象;框架无法过滤掉不匹配的记录。

而且,正如在其他 cmets 中所说,答案,因为它们都在同一个组中,所以它们将各自获得一个记录子集。

最简单的解决方案是使用两个不同的主题。

如果这不可能,您必须使用带有@KafkaHandler 方法的类级侦听器,但是您需要在反序列化器中进行转换,而不是框架从方法参数中找出类型。

请参阅 this answer 以获取指向示例的链接,该示例说明了如何执行此操作。

【讨论】:

    【解决方案2】:

    我可能会离开这里。因为我不确定@KafkaListener 是如何工作的。但是,在我的脑海中,有可能只有一个分区并且第一个 kakfaListener 被分配给它。因此,任何新的侦听器(类似于 kafka 消费者)都不会被分配任何分区,因为开始时只有 1 个分区。因此,新的侦听器将处于空闲状态,等待分配某个分区,以便它们可以开始读取数据。

    您可以通过禁用第一个 KafkaListener 然后运行您的应用程序来验证这一点。第二个监听器会自动启动。

    或者,如果您的主题已经有多个分区,那么它可能会因为消息的键而发生,因为键可用于将数据发送到一个分区。并且您的第一个侦听器被分配到该分区。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-08-31
      • 2019-02-22
      • 2018-11-02
      • 1970-01-01
      • 1970-01-01
      • 2023-03-07
      • 2022-12-13
      • 2019-05-27
      相关资源
      最近更新 更多