【问题标题】:how to configure two instances of Kafka StreamsBuilderFactoryBean in spring bootspring boot中如何配置Kafka StreamsBuilderFactoryBean的两个实例
【发布时间】:2019-07-27 07:34:23
【问题描述】:

使用 spring-boot-2.1.3、spring-kafka-2.2.4,我想要两个流配置(例如,具有不同的 application.id,或连接到不同的集群等)。所以我几乎根据文档定义了第一个流配置,然后添加了第二个,具有不同的名称,以及第二个 StreamsBuilderFactoryBean(也具有不同的名称):

@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration kStreamsConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId1000");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    //...
    return new KafkaStreamsConfiguration(props);
}

@Bean(name = "myKappConfig")
public KafkaStreamsConfiguration myKafkaAppIdConfiguration() {
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "myappId9999");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    //...
    return new KafkaStreamsConfiguration(props);
}

@Bean(name = "myKappConfigStreamBuilder")
public StreamsBuilderFactoryBean myAppStreamBuilder(
        @Qualifier("myKappConfig") KafkaStreamsConfiguration myKafkaAppIdConfiguration) {
    return new StreamsBuilderFactoryBean(myKafkaAppIdConfiguration);
}

但是,当我尝试运行该应用程序时,我得到:

方法kafkaStreamsFactoryBeanConfigurer的参数0 org.springframework.boot.autoconfigure.kafka.KafkaStreamsAnnotationDrivenConfiguration 需要一个 bean,但找到了 2 个: - &defaultKafkaStreamsBuilder:由类路径资源中的“defaultKafkaStreamsBuilder”方法定义 [org/springframework/kafka/annotation/KafkaStreamsDefaultConfiguration.class] - &myKappConfigStreamBuilder:由类路径资源中的“myAppStreamBuilder”方法定义 [com/teramedica/kafakaex001web/KafkaConfig.class]

因为 spring-boot 自动配置中的代码:

@Bean
public KafkaStreamsFactoryBeanConfigurer kafkaStreamsFactoryBeanConfigurer(
        StreamsBuilderFactoryBean factoryBean) {
    return new KafkaStreamsFactoryBeanConfigurer(this.properties, factoryBean);
}

没有完全替换 KafkaStreamsAnnotationDrivenConfiguration,我如何定义多个 StreamsBuilderFactoryBean。或者,如何更改给定流的属性?

【问题讨论】:

  • 如我所见,您配置了自己的StreamsBuilderFactoryBean,其中一个是在KafkaStreamsDefaultConfiguration 中自动配置的。如果您需要自己定义两个流构建器,也许您不需要自动配置中的 bean?所以只需排除此类自动配置,或删除@EnableKafkaStreams
  • 只需要标记一个为@Primary;不过,引导可能应该更宽松一些。
  • 应该标记为@Primary 的那个是在KafkaStreamsDefaultConfiguration 中定义的,并且是spring 代码,不是我的,所以我不能标记它(至少不能在类本身上使用注释)。或者,可以限定在 KafkaStreamsAnnotationDrivenConfiguration#kafkaStreamsFactoryBeanConfigurer 中使用的 StreamsBuilderFactoryBean。但同样,不是我的代码。我可以删除 EnableKafkaStreams,但是当我真的只想添加时,我正在复制现有代码。似乎我应该能够在不从头开始的情况下定义多个配置。
  • 抱歉 - 我没有收到关于您的问题的 cmets 通知,只有我的回答,所以我没有看到这个。在那里查看我的回复;我同意引导不应该在这种情况下呕吐。

标签: spring-boot apache-kafka-streams spring-kafka


【解决方案1】:

@Primary标记一个工厂bean。

【讨论】:

  • 有没有办法标记在现有代码中定义的 bean @Primary?我想将 KafkaStreamsDefaultConfiguration.defaultKafkaStreamsBuilder (弹簧代码)标记为主要。虽然我想我基本上可以复制那个所做的,将 that 标记为主要的,然后添加另一个。那么原始的 bean 就不会被使用,这有点笨拙。我想知道最后是否像@VasiliySarzhynskyi 所建议的那样,禁用自动配置并自己做会更干净。
  • 你需要重新定义他们的bean。 &gt;bean would just be unused 不,它根本不存在,它将被您的 bean 定义所取代。我建议你打开一个针对 Boot 的问题,要求他们更优雅地处理你的用例。
  • > 你需要重新定义他们的bean。 >bean 只是未使用,它根本不存在,它将被您的 bean 定义替换。我还需要删除EnableKafkaStreams,因为没有条件。但我看那是那里唯一的豆子,无论如何,
  • 刚刚遇到同样的问题并尝试排除 KafkaStreamsAnnotationDrivenConfiguration,但失败并显示“无法排除以下类,因为它们不是自动配置类:-org.springframework.boot.autoconfigure.kafka。 KafkaStreamsAnnotationDrivenConfiguration' 是预期的吗? KafkaStreamsAnnotationDrivenConfiguration 属于包范围。
  • 不要在 cmets 中提出新问题;如果排除不起作用,则可能是启动中的错误;提出一个包含更多详细信息的新问题,或者,如果您认为这是一个错误,请针对 Boot 提出问题。
猜你喜欢
  • 2018-07-10
  • 2017-04-27
  • 2021-09-10
  • 1970-01-01
  • 1970-01-01
  • 2019-07-14
  • 1970-01-01
  • 2020-02-09
  • 1970-01-01
相关资源
最近更新 更多