【问题标题】:Spring Cloud function RoutingSpring Cloud 函数路由
【发布时间】:2021-12-28 07:21:52
【问题描述】:

我正在尝试通过实现 Spring 云功能来创建一个 kafka。我正在使用 spring-boot 版本 2.4.0 和 spring cloud 版本:2020.0.0

@Slf4j
@Component
public class ArticleEventPublisher implements Function<ArticleAggregatedDocument, Message<ArticleMessage>> {

    private final ArticleMessageMapper articleMessageMapper;

    public ArticleEventPublisher(
        ArticleMessageMapper articleMessageMapper) {
        this.articleMessageMapper = articleMessageMapper;
    }

    @Override
    public Message<ArticleMessage> apply(ArticleAggregatedDocument articleAggregatedDocument) {
        String destination ="articleAggregated";
        log.info("CREATING MESSAGE IN FUNCTION BEAN");
        return createMessage( articleAggregatedDocument, destination);
    }

    private Message<ArticleMessage> createMessage(ArticleAggregatedDocument articleAggregatedDocument, String destination) {
        ArticleMessage articleMessage = articleMessageMapper.apply(articleAggregatedDocument);

        return MessageBuilder
            .withPayload(articleMessage)
            .setHeader(KafkaHeaders.MESSAGE_KEY, articleMessage.getKey().getBytes())
            .setHeader("spring.cloud.stream.sendto.destination", destination)
            .build();
    }
}

我知道它不是动态绑定并且目标是硬编码的,但是,我现在想测试这个特定的实现。 尽管应用程序中的控件确实转到了类并且正在创建消息,但我在“目标”中看不到它们。

我在 application.properties 中添加了以下内容:

spring.cloud.function.routing.enabled= true
spring.cloud.function.scan.packages= **publisher package name**
spring.cloud.stream.bindings.articleAggregated.destination=article.aggregated
spring.cloud.stream.bindings.articleAggregated.contentType=application/json
spring.cloud.stream.bindings.articleAggregated.producer.autoStartup=false
spring.cloud.stream.bindings.articleAggregated.producer.headerMode=none
spring.cloud.stream.kafka.bindings.articleAggregated.producer.configuration.client.id=articleAggregated
spring.cloud.stream.kafka.bindings.articleAggregated.producer.sync=true

在这里我还要提一下,IDEA intelliJ 中不再解析以下属性配置:

spring.cloud.function.routing.enabled
spring.cloud.function.scan.packages

它们被弃用了吗?我在文档中找不到任何关于它的信息。

从文档中我还读到,如果是多绑定器,应该为每个生产者/消费者添加绑定器,但是我们只为应用程序中的所有消费者和生产者使用一个绑定器,所以我没有添加它。但我确实在绝望的时刻尝试过:

spring.cloud.stream.bindings.articleAggregated.binder=kafka

我在这里缺少什么?我已经为此停留了一段时间,同时切换到 Sink+Supplier 实施作为替代方案。但我想了解为什么这个实现不起作用。

【问题讨论】:

  • 您的配置中发生了很多事情,难以理解。例如,articleAggregated 是什么?你正在使用什么版本?等等。 。你能创建一个最低限度的示例项目,将其推送到 github 并在此处留下链接,以便我们查看
  • 感谢您抽出宝贵时间阅读本文。我已经上传了一个示例项目。 github.com/rshm/SCF/tree/master

标签: spring-boot apache-kafka spring-cloud-stream spring-cloud-stream-binder-kafka spring-cloud-function


【解决方案1】:

谢谢拉什米。 让我想出最好的答案,但请随时跟进更多问题。所以。 . .

  • 在您的示例中,我看到一个函数userInfo。这将导致两个绑定 - userInfo-in-0 用于输入,userInfo-out-0 用于输出。
  • 由于此函数生成带有. . .sendto.destination 标头的消息,因此userInfo-out-0 绑定将仅在sendto.destination 标头未设置的情况下用作输出目标。
  • 在您的配置中,我看到了spring.cloud.function.routing.enabledspring.cloud.function.scan.packagesspring.kafka.bootstrap-servers 和三个spring.cloud.stream.kafka.* 属性,它们指向默认的 localhost kafka 实例,我们在默认情况下已经知道这些属性,因此这些属性都不是必需的。
  • 您也不需要spring.cloud.stream.bindings.articleAggregated.binder=kafka 属性,除非您的类路径中有多个您不需要的绑定器。

现在我对spring.cloud.stream.bindings.userAggregated.destination 属性感到困惑。 userAggregated 是一个动态目的地。通过简单地向它发送 s-c-stream ,如果它不存在,它将自动在 Kafka 中提供它,否则它将被使用并且您可以在那里看到您的消息。

换句话说,您可以删除您已配置的所有属性,将消息发送到 Kafka 中的userInfo-out-0 目的地,您应该在userAggregated 目的地中成功检索它。

如果我遗漏了什么,请告诉我。

【讨论】:

    猜你喜欢
    • 2019-08-03
    • 2019-03-31
    • 2023-01-21
    • 2018-06-23
    • 2016-08-31
    • 2017-12-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多