【发布时间】:2022-08-15 05:28:27
【问题描述】:
目标
我必须设置一个组号对于 kafka 流使用者,它符合严格的命名约定。
在深入遵循文档后,我找不到有效的方法。由于我仍然认为我可能误解了某些内容,因此我更愿意在此处打开一个问题以供同行评审,然后再打开 spring-cloud-stream github 存储库上的错误问题。
注意:
A similar question 一年前已经问过了,但问题不是很详尽,还没有回答,希望我能在这里对问题有更多的见解。
官方文档说明了什么(也基于 WARN 消息)
从官方文档的几个来源,我看到这应该很容易在我的应用程序的application.yaml 中配置。
该文档指出,我可以:
- 使用
spring.cloud.stream.kafka.default.group=<value>部分为所有活页夹使用默认值 - 或在
spring.cloud.stream.bindings.<channelName>.group中为我的频道使用特定值
如果我在spring.kafka.consumer.group-id 中直接设置kafka 通用字段group-id,则该参数被明确忽略,我得到以下WARN:
2022-08-10 10:18:18.376 [main] [WARN ] [o.s.c.s.b.k.s.p.KafkaStreamsBinderConfigurationProperties] - Ignoring provided value(s) for \'group.id\'. Use spring.cloud.stream.default.group or spring.cloud.stream.binding.<name>.group to specify the group instead of group.id
所以我也尝试了spring.cloud.stream.default.group 和spring.cloud.stream.binding.<name>.group 两个部分(注意这里是binding 而不是bindings,没有s)。
编辑:根据@OlegZhurakousky 的评论,这只是错误消息中的错字。我在有和没有s 的情况下进行了测试,但均未成功。
查看库的代码
我快速浏览了一下流代码,这个属性似乎确实是必须设置的,比如他们在做in their tests,我们可以看到他们使用例如:--spring.cloud.stream.bindings.uppercase-in-0.group=inputGroup。
遵循文档后的问题
在测试所有上述配置后,组 ID 似乎总是被忽略。该组始终设置为默认值,即groupId=process-applicationId。
例如在日志中如下:
2022-08-10 10:30:56.644 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.c.c.i.SubscriptionState] - [Consumer clientId=process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1-consumer, groupId=process-applicationId] Resetting offset for partition my-custom-topic-0 to position FetchPosition{offset=0, offsetEpoch=Optional.empty, currentLeader=LeaderAndEpoch{leader=Optional[kafka:9092 (id: 1 rack: null)], epoch=0}}.
2022-08-10 10:32:56.713 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
2022-08-10 10:34:56.767 [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] [INFO ] [o.a.k.s.p.internals.StreamThread] - stream-thread [process-applicationId-c433e54c-2a51-4618-b7a6-14a96b252daf-StreamThread-1] Processed 0 total records, ran 0 punctuators, and committed 0 total tasks since the last update
就像根本不使用组的application.yaml。另一方面,设置destination: my-custom-topic 的spring.cloud.stream.bindings.process-in-0.destination=my-custom-topic 字段被理解并且正确地遵循了主题(参见上面的日志)。
我的应用程序是如何设置的
pom.xml 中的相关依赖项
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.6</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>3.1.1</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
<version>3.2.4</version>
</dependency>
kafka 流消费者类(简化为仅包含相关部分)
package my.custom.stuff;
import org.apache.kafka.streams.kstream.KStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.function.Consumer;
@Component
public class myKafkaStreamConsumer {
private static final Logger logger = LoggerFactory.getLogger(myKafkaStreamConsumer.class);
@Bean
public static Consumer<KStream<String, String>> process() {
return input ->
input.foreach((key, value) -> {
logger.debug(\"from STREAM: Key= {} , value = {}\", key, value);
// ...
// my message handling business logic
// ...
});
}
}
application.yaml 的一个版本
我在这里放了 application.yaml 的版本,恕我直言,它应该是最符合文档的,但仍然无法正常工作,请注意 destination 已正确使用,因此至少它使用了正确的通道。
spring:
kafka:
bootstrap-servers: kafka:9092
consumer:
auto-offset-reset: earliest
cloud:
stream:
bindings:
process-in-0:
group: myCustomGroupId
destination: \"my-custom-topic\"
我已经测试过的(不成功)
我尝试以多种方式注入组 ID,包括:
- 我可以在任何官方文档或示例中找到的所有可能组合
- 将其添加到
consumer小节中,例如spring.cloud.stream.bindings.process-in-0.consumer.group或spring.cloud.stream.bindings.process-in-0.consumer.group-id - 将官方记录的密钥作为环境变量注入
它似乎总是被忽视。
参考
-
您是否尝试过设置
default组?不是process-in-0? -
@MarkiianBenovskyi,你的意思是
spring.cloud.stream.default.group吗?是的,我试过了 -
您看到的关于单数
binding的错误消息是我们必须修复的类型 它应该是复数spring.cloud.stream.bindings.binding-name.group=hello -
@OlegZhurakousky 感谢您确认这一点,我想过,但还是想试一试
标签: java spring-cloud-stream spring-cloud-stream-binder-kafka