【问题标题】:Spring Cloud stream - Routing to multiple destinations streamBridgeSpring Cloud 流 - 路由到多个目的地 streamBridge
【发布时间】:2021-11-15 09:53:08
【问题描述】:

我有 3 个服务,“service-1”,它根据传递给有效负载的 id 值动态路由到其他 2 个服务。

但支付处理器总是使用 -out-0 "spring.cloud.stream.function.bindings.processor-out-0" 转到主题 service-3 永远不会被调用。

尝试使用已弃用的 spring.cloud.stream.sendto.destination,但它仍然服务 -3 永远不会被路由

服务-1

    @Bean
    public Function<Flux<String>, Flux<Object>> processor() {
        return messageFlux -> messageFlux.flatMap(
                stringMessage -> {
                    try {
                        payload = jsonMapper.readValue(stringMessage, Map.class);
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    }
                    logger.info("Payment payload  :" + payload.get("id"));
                    if(payload.get("id").equalsIgnoreCase("e-payment"))
                        streamBridge.send("processor-out-0",stringMessage);
                    else
                        streamBridge.send("processor-out-1",stringMessage);
                    return Mono.just(stringMessage);
                });
    }

    @Bean
    public Consumer<String> consumer() {
        return data -> logger.info("Payment Response :" + data);
    }

服务 1 应用程序.properties

server.port = 9000
spring.application.name=payment-process
spring.cloud.function.definition=supplier;processor;consumer
spring.cloud.stream.source=processorCheck;processorEPayment
#bindings for payment processor
spring.cloud.stream.function.bindings.processor-out-0=e-payment
spring.cloud.stream.function.bindings.processor-out-1=check-payment
spring.cloud.stream.function.bindings.processor-in-0=payment-process
#bindings for response for payment processor
spring.cloud.stream.function.bindings.consumer-in-0=payment-response
#grouping
spring.cloud.stream.bindings.processor-in-0.group=check-service
spring.cloud.stream.bindings.processor-out-0.group=check-service

服务 2

@Bean
    public Function<String,String> processorCheck() {
        return
                data ->{
                    logger.info("Data received from :" + data);
                    return "Check Payment Successful";
                };
    }

service-2 应用程序.properties

server.port = 9002
spring.application.name=check-service
spring.cloud.function.definition=processorCheck
spring.cloud.stream.source=processorCheck
spring.cloud.stream.bindings.processorCheck-in-0.destination=check-payment
spring.cloud.stream.bindings.processorCheck-out-0.destination=payment-response
spring.cloud.stream.bindings.processorCheck-in-0.group=check-service
spring.cloud.stream.bindings.processorCheck-out-0.group=check-service
spring.cloud.stream.function.routing.enabled=true

消费者服务-3

    @Bean
    public Function<String,String> processorEPayment() {
        return
         data ->{
                 logger.info("Data received from :" + data);
                 return "E-Payment Successful";
         };
    }

service-3 应用程序.properties

server.port = 9001
spring.application.name=e-payment-service
spring.cloud.function.definition=processorEPayment
#input to e-payment service topic
spring.cloud.stream.bindings.processorEPayment-in-0.destination=e-payment
spring.cloud.stream.source=processorEPayment
#output response to e-payment topic
spring.cloud.stream.bindings.processorEPayment-out-0.destination=payment-response
spring.cloud.stream.bindings.processorEPayment-in-0.group=e-payment-service
spring.cloud.stream.bindings.processorEPayment-out-0.group=e-payment-service
spring.cloud.stream.function.routing.enabled=true

【问题讨论】:

    标签: java spring spring-boot spring-cloud spring-cloud-stream


    【解决方案1】:

    在您的 service-1 中,在第一个 StreamBridge 发送中,您可以这样做 - streamBridge.send("e-payment",stringMessage); 第二个 StreamBridge 发送调用变为 - streamBridge.send("check-payment",stringMessage); 它们将被发送到主题,e-payment 和 @987654326 @分别。

    在 service-1 中删除这两个属性:(它们是不必要的,可能会改变绑定的语义)。

    spring.cloud.stream.function.bindings.processor-out-0=e-payment
    spring.cloud.stream.function.bindings.processor-out-1=check-payment
    

    您需要为输出绑定添加目的地。

    spring.cloud.stream.bindings.processor-out-0.destination=payment-response
    

    消费者随后将从该主题消费(因为您更改了消费者的绑定以反映该主题)。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2014-11-23
      • 1970-01-01
      • 2020-10-19
      • 2019-03-25
      • 2023-04-02
      • 2022-08-16
      • 1970-01-01
      相关资源
      最近更新 更多