【问题标题】:Configured group ID is ignored in spring-cloud-streamsspring-cloud-streams 中忽略配置的组 ID
【发布时间】:2022-08-15 05:28:27
【问题描述】:

目标

我必须设置一个组号对于 kafka 流使用者,它符合严格的命名约定。

在深入遵循文档后,我找不到有效的方法。由于我仍然认为我可能误解了某些内容,因此我更愿意在此处打开一个问题以供同行评审,然后再打开 spring-cloud-stream github 存储库上的错误问题。

注意:

A similar question 一年前已经问过了,但问题不是很详尽,还没有回答,希望我能在这里对问题有更多的见解。

官方文档说明了什么(也基于 WARN 消息)

从官方文档的几个来源,我看到这应该很容易在我的应用程序的application.yaml 中配置。

该文档指出,我可以:

  • 使用spring.cloud.stream.kafka.default.group=<value> 部分为所有活页夹使用默认值
  • 或在spring.cloud.stream.bindings.<channelName>.group 中为我的频道使用特定值

如果我在spring.kafka.consumer.group-id 中直接设置kafka 通用字段group-id,则该参数被明确忽略,我得到以下WARN

2022-08-10 10:18:18.376 [main] [WARN ] [o.s.c.s.b.k.s.p.KafkaStreamsBinderConfigurationProperties] - Ignoring provided value(s) for \'group.id\'. Use spring.cloud.stream.default.group or spring.cloud.stream.binding.<name>.group to specify the group instead of group.id

所以我也尝试了spring.cloud.stream.default.groupspring.cloud.stream.binding.&lt;name&gt;.group 两个部分(注意这里是binding 而不是bindings,没有s)。

编辑:根据@OlegZhurakousky 的评论,这只是错误消息中的错字。我在有和没有s 的情况下进行了测试,但均未成功。

查看库的代码

我快速浏览了一下流代码,这个属性似乎确实是必须设置的,比如他们在做in their tests,我们可以看到他们使用例如:--spring.cloud.stream.bindings.uppercase-in-0.group=inputGroup

遵循文档后的问题

在测试所有上述配置后,组 ID 似乎总是被忽略。该组始终设置为默认值,即groupId=process-applicationId

例如在日志中如下:

2022-08-10 10:30:56.644 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.c.c.i.SubscriptionState] - [Consumer clientId=process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1-consumer, groupId=process-applicationId] Resetting offset for partition my-custom-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 1 rack: null)], epoch=0}}.
2022-08-10 10:32:56.713 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
2022-08-10 10:34:56.767 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update

就像根本不使用组的application.yaml。另一方面,设置destination: my-custom-topicspring.cloud.stream.bindings.process-in-0.destination=my-custom-topic 字段被理解并且正确地遵循了主题(参见上面的日志)。

我的应用程序是如何设置的

pom.xml 中的相关依赖项

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.8.6</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>3.1.1</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
            <version>3.2.4</version>
        </dependency>

kafka 流消费者类(简化为仅包含相关部分)


package my.custom.stuff;


import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;

@Component
public class myKafkaStreamConsumer {

    private static final Logger logger = LoggerFactory.getLogger(myKafkaStreamConsumer.class);

    @Bean
    public static Consumer<KStream<String, String>> process() {
        return input ->
                input.foreach((key, value) -> {
                    logger.debug(\"from STREAM: Key= {} , value = {}\", key, value);
                    // ...
                    // my message handling business logic
                    // ...
                });
    }
}

application.yaml 的一个版本

我在这里放了 application.yaml 的版本,恕我直言,它应该是最符合文档的,但仍然无法正常工作,请注意 destination 已正确使用,因此至少它使用了正确的通道。

spring:
  kafka:
    bootstrap-servers: kafka:9092
    consumer:
      auto-offset-reset: earliest
  cloud:
    stream:
      bindings:
        process-in-0:
          group: myCustomGroupId
          destination: \"my-custom-topic\"

我已经测试过的(不成功)

我尝试以多种方式注入组 ID,包括:

  • 我可以在任何官方文档或示例中找到的所有可能组合
  • 将其添加到consumer 小节中,例如spring.cloud.stream.bindings.process-in-0.consumer.groupspring.cloud.stream.bindings.process-in-0.consumer.group-id
  • 将官方记录的密钥作为环境变量注入

它似乎总是被忽视。

参考

  • 您是否尝试过设置default 组?不是process-in-0
  • @MarkiianBenovskyi,你的意思是 spring.cloud.stream.default.group 吗?是的,我试过了
  • 您看到的关于单数 binding 的错误消息是我们必须修复的类型 它应该是复数 spring.cloud.stream.bindings.binding-name.group=hello
  • @OlegZhurakousky 感谢您确认这一点,我想过,但还是想试一试

标签: java spring-cloud-stream spring-cloud-stream-binder-kafka


【解决方案1】:

有点免责声明,我对 Spring 有点生疏,但由于过去几个月我一直在使用 Kafka,所以我也想玩这个。 我通过做两件事让它工作:

  • 在应用程序属性中使用 applicationId 而不是 group

    spring:
      kafka:
        bootstrap-servers: localhost:29092
        consumer:
          auto-offset-reset: earliest
      cloud:
        stream:
          kafka:
            binder:
              functions:
                process:
                  applicationId: MyGroupIdUsingApplicationId
          bindings:
            process-in-0:
              bindings:
                process-in-0:
                  destination: my-custom-topic
    
    
  • 显式声明一个KafkaBinderConfigurationProperties bean

我在这里创建了一个工作示例,供您在需要时进行克隆和测试: https://github.com/T-TK-Wan/SO-Spring_Cloud_Streams_Kafka_GroupId

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-01-03
    • 2021-02-24
    • 2015-06-07
    • 2018-06-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多