【问题标题】:Spring Cloud Stream - route to multiple dynamic destinations at runtimeSpring Cloud Stream - 在运行时路由到多个动态目的地
【发布时间】:2020-06-07 14:54:56
【问题描述】:

我有一个用例,我需要生成在运行时确定的多个 Kafka 主题/目的地。我尝试通过使用来自Function 类型的功能bean 的返回Flux<Message<T>> 来组合Functions with multiple input and output arguments,并为每个Message 设置标题spring.cloud.stream.sendto.destination,如here 所述。我想出了以下实现:

@Bean
public Function<Person, Flux<Message<Person>>> route() {
    return person -> Flux.fromIterable(Stream.of(person.getEvents())
            .map(e -> MessageBuilder.withPayload(person)
                    .setHeader("spring.cloud.stream.sendto.destination", e).build())
            .collect(Collectors.toList()));
}

我的配置中也有这个:

spring.cloud.stream.dynamic-destinations=

这是我的Person

@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person {
    private String[] events;
    private String name;
}

events 包含 Kafka 主题名称列表。

但是,它不起作用。我错过了什么?

【问题讨论】:

    标签: spring spring-boot spring-cloud-stream


    【解决方案1】:

    spring.cloud.stream.sendto.destination 在内部使用 BinderAwareChannelResolver,已弃用 StreamBridge。我认为你可以重写你的代码如下。我还没有测试过,但是这里是模板:

    @Autowired StreamBridge streamBridge;
    
    @Bean
    public Consumer<Person> route() {
        return person -> streamBridge.send(person.getName(), person);
    }
    
    

    在幕后,Spring Cloud Stream 将为Person 动态创建绑定。

    如果您在部署时提前知道您的目的地,您也可以通过配置进行设置。例如spring.cloud.stream.sourcefoo;bar..;...。然后框架以foo-out-0bar-out-0 等的形式创建输出绑定。然后您需要设置目的地-spring.cloud.stream.bindings.foo-out-0.destination=foo。但是由于您的用例严格涉及动态目的地,因此您不能采用这种方法,而是尝试使用我上面建议的方法。

    【讨论】:

      【解决方案2】:

      一个有效的解决方案使用BinderAwareChannelResolver。但是,如果在 3.0 中提供 spring.cloud.stream.sendto.destination 属性,则不赞成使用它。

      @Autowired
      private BinderAwareChannelResolver binderAwareChannelResolver;
      
      @Bean
      public Consumer<Person> route() {
      return person ->
              Stream.of(person.getEvents())
                      .forEach(e -> binderAwareChannelResolver.resolveDestination(e)
                              .send(MessageBuilder.withPayload(person).build()));
      }
      

      我不喜欢这个解决方案,因为它结合了基于函数的编程模型和“传统风格”的编程模型。如果有人有更好的解决方案,请随时评论/回答。

      【讨论】:

        猜你喜欢
        • 2021-11-15
        • 2021-08-03
        • 1970-01-01
        • 2018-06-23
        • 2021-08-04
        • 2019-03-31
        • 1970-01-01
        • 1970-01-01
        • 2023-03-19
        相关资源
        最近更新 更多