【问题标题】:Can record processor be spring singleton bean?记录处理器可以是spring singleton bean吗?
【发布时间】:2021-02-14 21:46:50
【问题描述】:

我正在使用 spring-kafka 来实现将小写转换为大写的拓扑,如下所示:

        @Bean
    public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {

        KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

        // A new processor object is created here per record 
        sourceStream.process(() -> new CapitalCaseProcessor());
        ...
    }

处理器不是spring singleton bean,声明如下:

public class CapitalCaseProcessor implements Processor<String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String value) {
        context.headers().forEach(System.out::println);
    }

上面的处理器是有状态的,并持有处理器上下文的状态。

现在,如果我们将有状态的 CapitalCaseProcessor 转换为 spring 单例 bean 会发生什么?

@Component
public class CapitalCaseProcessor implements Processor<String, String> {
    
    //Is the ProcessorContext going to have thread safety issue now?
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String value) {
        context.headers().forEach(System.out::println);
    }

并尝试将其作为spring bean注入主拓扑:

@Configuration
public class UppercaseTopologyProcessor {
       @Autowired CapitalCaseProcessor capitalCaseProcessor;
        @Bean
    public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {

        KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

        // A singleton spring bean processor is now used for all the records 
        sourceStream.process(() -> capitalCaseProcessor);
        ...
    }

现在是否会导致 CapitalCaseProcessor 出现线程安全问题,因为它包含 processorContext 作为状态?

或者像这样将其声明为原型bean更好?

@Configuration
public class UppercaseTopologyProcessor {
           
     @Lookup
     public CapitalCaseProcessor getCapitalCaseProcessor() {return null;}
        
     @Bean
    public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {

        KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

        // A singleton spring bean processor is now used for all the records 
        sourceStream.process(() -> getCapitalCaseProcessor());
        ...
    }

更新:我基本上想知道两件事:

  1. 处理器实例是否应该与每个流记录相关联,例如 AKKA 演员模型,其中演员是有状态的并且根据请求工作,或者它可以是单例对象?
  2. ProcessorContext 线程安全吗?

【问题讨论】:

  • 将其作为原型 bean 不会有任何区别,因为 @Autowired 仅在启动期间应用一次。我不太了解 Kafka Streams 内部结构,无法知道上下文是否是线程安全的(但我希望如此)。
  • @GaryRussell 谢谢!,我已经更新了我的问题以提供更多背景信息。

标签: apache-kafka-streams spring-kafka


【解决方案1】:

我刚刚运行了一个测试,处理器上下文不是线程安全的,使流线程安全的是您使用ProcessorSupplier(在您的第一个示例中)为每个记录创建一个新的处理器实例。

你当然不能用 Spring 单例替换它。

这是我的测试,使用 Spring 为 Apache Kafka 提供的 MessagingTransformer

@SpringBootApplication
@EnableKafkaStreams
public class So66200448Application {


    private static final Logger log = LoggerFactory.getLogger(So66200448Application.class);


    public static void main(String[] args) {
        SpringApplication.run(So66200448Application.class, args);
    }

    @Bean
    KStream<String, String> stream(StreamsBuilder sb) {
        KStream<String, String> stream = sb.stream("so66200448");

        stream.transform(() ->  new MessagingTransformer(msg -> {
            log.info(msg.toString());
            log.info(new String(msg.getHeaders().get("foo", byte[].class)));
            return msg;
        }, new MessagingMessageConverter()) {

            @Override
            public KeyValue transform(Object key, Object value) {
                try {
                    Thread.sleep(5000);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return super.transform(key, value);
            }

        })
            .to("so66200448out");
        return stream;
    }

    @Bean
    public NewTopic topic1() {
        return TopicBuilder.name("so66200448").partitions(2).replicas(1).build();
    }

    @Bean
    public NewTopic topic2() {
        return TopicBuilder.name("so66200448out").partitions(2).replicas(1).build();
    }


    @Bean
    public ApplicationRunner runner(KafkaTemplate<String, String> template) {
        return args -> {
            Headers headers = new RecordHeaders();
            headers.add(new RecordHeader("foo", "bar".getBytes()));
            ProducerRecord<String, String> record = new ProducerRecord<>("so66200448", 0, null, "foo", headers);
            template.send(record);
            headers.remove("foo");
            headers.add(new RecordHeader("foo", "baz".getBytes()));
            record = new ProducerRecord<>("so66200448", 1, null, "bar", headers);
            template.send(record);
        };
    }

    @KafkaListener(id = "so66200448out", topics = "so66200448out")
    public void listen(String in) {
        System.out.println(in);
    }

}
spring.kafka.streams.application-id=so66200448
spring.kafka.streams.properties.num.stream.threads=2
spring.kafka.consumer.auto-offset-reset=earliest

2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-1] com.example.demo.So66200448Application : bar

2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-2] com.example.demo.So66200448Application : baz

每次更改供应商返回相同的实例,肯定会破坏它。

@Bean
KStream<String, String> stream(StreamsBuilder sb) {
    KStream<String, String> stream = sb.stream("so66200448");
    MessagingTransformer transformer = new MessagingTransformer(msg -> {
        log.info(msg.toString());
        log.info(new String(msg.getHeaders().get("foo", byte[].class)));
        return msg;
    }, new MessagingMessageConverter()) {

        @Override
        public KeyValue transform(Object key, Object value) {
            try {
                Thread.sleep(5000);
            }
            catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            return super.transform(key, value);
        }

    };
    stream.transform(() ->  transformer)
        .to("so66200448out");
    return stream;
}

2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-1] com.example.demo.So66200448Application : baz

2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-2] com.example.demo.So66200448Application : baz

因此,为了线程安全,流依赖于每次获取一个新实例。

【讨论】:

  • 当然,您可以将其注册为原型 bean,并在每次调用供应商() -&gt; context.getBean("myProcessor", Processor.class) 时从应用程序上下文中获取一个新实例。但这只会在您需要对其执行一些 Spring 初始化时增加价值。
猜你喜欢
  • 1970-01-01
  • 2011-03-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2012-10-21
  • 1970-01-01
  • 2016-04-15
  • 1970-01-01
相关资源
最近更新 更多