【问题标题】:spring-cloud-stream with functional programming带有函数式编程的 spring-cloud-stream
【发布时间】:2021-03-22 21:50:32
【问题描述】:

在我的应用程序中,streamlistner 从多个主题消费了 kafka 消息,但我想为一个主题添加功能性消费者 bean,这可能是一些主题由 streamlistner 消费,而一些主题与消费者 bean 在同一个应用程序中? 当我尝试创建的主题仅由 streamlistner 消费而不是为消费者 bean 创建?

application.yml

spring:
  cloud:
    function:
        definition: printString
    stream:
        kafka:
            binder:
                brokers: localhost:9092
                zkNodes: localhost:2181
                autoCreateTopics: true
                autoAddPartitions: true
            bindings:
                printString-in-0:
                  destination: function-input-topic
                  group: abc
                  consumer:
                    maxAttempts: 1
                    partitioned: true
                    concurrency: 2
                xyz-input:
                  group: abc
                  destination: xyz-input-input
                  consumer:
                    maxAttempts: 1
                    partitioned: true
                    concurrency: 2

主题是为 xyz 输入主题创建的,但不是为函数输入主题创建的

消费豆

import java.util.function.Consumer;

@Component
public class xyz {

   @Bean
   Consumer<String> printString() {
       return System.out::print;
   }
}

kafkaConfig 接口

public interface KafkaConfig {

   @Input("xyz-input")
   SubscribableChannel inbound();
}

【问题讨论】:

    标签: apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka


    【解决方案1】:

    不,我们过去尝试过,但是当两种编程模式发生冲突时会出现一些微妙的问题。将任何StreamListener 分解为功能性方式(主要是删除代码)非常简单,并且应用程序实际上变得更加简单。

    KafkaConfig接口全部去掉,去掉@EnableBinding就可以了。

    【讨论】:

      猜你喜欢
      • 2019-10-24
      • 1970-01-01
      • 2021-09-08
      • 2020-12-25
      • 2020-02-10
      • 2018-11-25
      • 2020-12-10
      • 2019-03-19
      • 2020-06-01
      相关资源
      最近更新 更多