【发布时间】:2020-06-07 14:54:56
【问题描述】:
我有一个用例,我需要生成在运行时确定的多个 Kafka 主题/目的地。我尝试通过使用来自Function 类型的功能bean 的返回Flux<Message<T>> 来组合Functions with multiple input and output arguments,并为每个Message 设置标题spring.cloud.stream.sendto.destination,如here 所述。我想出了以下实现:
@Bean
public Function<Person, Flux<Message<Person>>> route() {
return person -> Flux.fromIterable(Stream.of(person.getEvents())
.map(e -> MessageBuilder.withPayload(person)
.setHeader("spring.cloud.stream.sendto.destination", e).build())
.collect(Collectors.toList()));
}
我的配置中也有这个:
spring.cloud.stream.dynamic-destinations=
这是我的Person:
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person {
private String[] events;
private String name;
}
events 包含 Kafka 主题名称列表。
但是,它不起作用。我错过了什么?
【问题讨论】:
标签: spring spring-boot spring-cloud-stream