【发布时间】:2021-02-14 21:46:50
【问题描述】:
我正在使用 spring-kafka 来实现将小写转换为大写的拓扑,如下所示:
@Bean
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A new processor object is created here per record
sourceStream.process(() -> new CapitalCaseProcessor());
...
}
处理器不是spring singleton bean,声明如下:
public class CapitalCaseProcessor implements Processor<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
context.headers().forEach(System.out::println);
}
上面的处理器是有状态的,并持有处理器上下文的状态。
现在,如果我们将有状态的 CapitalCaseProcessor 转换为 spring 单例 bean 会发生什么?
@Component
public class CapitalCaseProcessor implements Processor<String, String> {
//Is the ProcessorContext going to have thread safety issue now?
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
context.headers().forEach(System.out::println);
}
并尝试将其作为spring bean注入主拓扑:
@Configuration
public class UppercaseTopologyProcessor {
@Autowired CapitalCaseProcessor capitalCaseProcessor;
@Bean
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A singleton spring bean processor is now used for all the records
sourceStream.process(() -> capitalCaseProcessor);
...
}
现在是否会导致 CapitalCaseProcessor 出现线程安全问题,因为它包含 processorContext 作为状态?
或者像这样将其声明为原型bean更好?
@Configuration
public class UppercaseTopologyProcessor {
@Lookup
public CapitalCaseProcessor getCapitalCaseProcessor() {return null;}
@Bean
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A singleton spring bean processor is now used for all the records
sourceStream.process(() -> getCapitalCaseProcessor());
...
}
更新:我基本上想知道两件事:
- 处理器实例是否应该与每个流记录相关联,例如 AKKA 演员模型,其中演员是有状态的并且根据请求工作,或者它可以是单例对象?
- ProcessorContext 线程安全吗?
【问题讨论】:
-
将其作为原型 bean 不会有任何区别,因为
@Autowired仅在启动期间应用一次。我不太了解 Kafka Streams 内部结构,无法知道上下文是否是线程安全的(但我希望如此)。 -
@GaryRussell 谢谢!,我已经更新了我的问题以提供更多背景信息。
标签: apache-kafka-streams spring-kafka