【发布时间】:2021-02-28 16:45:58
【问题描述】:
谁能帮我为下面的代码设置随机KafkaHeaders.MESSAGE_KEY,以便我可以将有效负载发布到不同的分区。
@Bean
public IntegrationFlow fileInboundChannelFlow() {
FileInboundChannelAdapterSpec messageSourceSpec = Files
.inboundAdapter(Paths.get(this.properties.getDirectory()).toFile());
messageSourceSpec = messageSourceSpec.filter(getFilter());
//messageSourceSpec.regexFilter(this.properties.getFilenameRegex());
messageSourceSpec.preventDuplicates(this.properties.isPreventDuplicates());
IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceSpec)
.split(new FileSplitter(true, true))
.enrichHeaders(h -> h.header(KafkaHeaders.MESSAGE_KEY, "payload.flightNumber"));
return flowBuilder.<Object, Class<?>>route(Object::getClass,
m -> m.channelMapping(FileSplitter.FileMarker.class, "markers.input").channelMapping(String.class,
"lines.input"))
.get();
}
我想根据 payload.prod_cd 设置密钥。 由于它是单例 bean,我想为每个有效负载初始化不同的 kafka 标头键。
【问题讨论】:
标签: java spring apache-kafka spring-integration