【问题标题】:How to set the KafkaHeaders.MESSAGE_KEY in spring integration如何在 spring 集成中设置 KafkaHeaders.MESSAGE_KEY
【发布时间】:2021-02-28 16:45:58
【问题描述】:

谁能帮我为下面的代码设置随机KafkaHeaders.MESSAGE_KEY,以便我可以将有效负载发布到不同的分区。

@Bean
public IntegrationFlow fileInboundChannelFlow() {
    FileInboundChannelAdapterSpec messageSourceSpec = Files
            .inboundAdapter(Paths.get(this.properties.getDirectory()).toFile());

    messageSourceSpec = messageSourceSpec.filter(getFilter());
    //messageSourceSpec.regexFilter(this.properties.getFilenameRegex());
    messageSourceSpec.preventDuplicates(this.properties.isPreventDuplicates());

    IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(messageSourceSpec)
            .split(new FileSplitter(true, true))
            .enrichHeaders(h -> h.header(KafkaHeaders.MESSAGE_KEY, "payload.flightNumber"));

    return flowBuilder.<Object, Class<?>>route(Object::getClass,
            m -> m.channelMapping(FileSplitter.FileMarker.class, "markers.input").channelMapping(String.class,
                    "lines.input"))
            .get();
}

我想根据 payload.prod_cd 设置密钥。 由于它是单例 bean,我想为每个有效负载初始化不同的 kafka 标头键。

【问题讨论】:

    标签: java spring apache-kafka spring-integration


    【解决方案1】:

    你在正确的轨道上:

    .enrichHeaders(h -> h.header(KafkaHeaders.MESSAGE_KEY, "payload.flightNumber"));
    

    您只需要改用headerExpression()

    /**
     * Add a single header specification where the value is a String representation of a
     * SpEL {@link Expression}. If the header exists, it will <b>not</b> be overwritten
     * unless {@link #defaultOverwrite(boolean)} is true.
     * @param name the header name.
     * @param expression the expression.
     * @return the header enricher spec.
     */
    public HeaderEnricherSpec headerExpression(String name, String expression) {
    

    【讨论】:

    • 您好我已经更改了文件 .enrichHeaders(h -> h.headerExpression(KafkaHeaders.MESSAGE_KEY,"T(java.util.UUID).randomUUID().toString()"));但每个文件都有不同的键而不是文件中的每条记录
    • 这样的enrichHeaders() 运算符必须在.split(new FileSplitter(true, true)) 之后。然后文件中的每条记录都会有自己的KafkaHeaders.MESSAGE_KEY。注意:对于 UUID 变体,您可以考虑使用 headerFunction() 来代替方便和代码完成。
    猜你喜欢
    • 2022-01-15
    • 2020-05-08
    • 2012-07-02
    • 2017-01-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多