【问题标题】:How do I convert this spring-integration configuration from XML to Java?如何将此弹簧集成配置从 XML 转换为 Java?
【发布时间】:2019-03-27 19:53:32
【问题描述】:

这个特定的部分在应用程序而不是 XML 中实现是有意义的,因为它在整个集群中是一个常量,而不是本地化到单个作业。

通过剖析 XSD,在我看来,int-kafka:outbound-channel-adapter 的 xml 构造了一个 KafkaProducerMessageHandler。

没有可见的方法来设置频道、主题或大多数其他属性。

注意潜在的反对者 - (咆哮)我已经使用 RTFM 一周了,并且比我开始时更加困惑。我对语言的选择已经从形容词到副词毕业,我开始从其他语言借词。答案可能就在里面。但如果是这样,它就不是普通人能找到的。 (吐槽)

XML 配置:

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="kafkaTemplate"
                                    auto-startup="false"
                                    channel="outbound-staging"
                                    topic="foo"
                                    sync="false"
                                    message-key-expression="'bar'"
                                    send-failure-channel="failures"
                                    send-success-channel="successes"
                                    partition-id-expression="2">
</int-kafka:outbound-channel-adapter>

如果是这样,那么我希望 java 配置看起来像这样:

@Bean
public KafkaProducerMessageHandler kafkaOutboundChannelAdapter () {
    KafkaProducerMessageHandler result = new KafkaProducerMessageHandler(kafkaTemplate());

    result.set????? ();    // WTH?? No methods for most of the attributes?!!!

    return result;
}

编辑:关于正在解决的高级问题的附加信息

作为一个更大项目的一部分,我正在尝试实现来自 https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#remote-partitioning 的教科书示例,使用 Kafka 支持而不是 JMS 支持。

我相信最终的集成流程应该是这样的:

partitionHandler -> messingTemplate -> outbound-requests (DirectChannel) -> outbound-staging (KafkaProducerMessageHandler) -> kafka

kafka -> executionContainer (KafkaMessageListenerContainer) -> inboundKafkaRequests (KafkaMessageDrivenChannelAdapter) -> inbound-requests (DirectChannel) -> serviceActivator (StepExecutionRequestHandler)

serviceActivator (StepExecutionRequestHandler) -> 回复-staging (KafkaProducerMessageHandler) -> kafka

kafka -> replyContainer (KafkaMessageListenerContainer) -> inboundKafkaReplies (KafkaMessageDrivenChannelAdapter) -> inbound-replies (DirectChannel) -> partitionhandler

【问题讨论】:

    标签: java spring spring-integration


    【解决方案1】:

    不知道你的意思是他们错过了,但这是我在KafkaProducerMessageHandler的源代码中看到的:

    public void setTopicExpression(Expression topicExpression) {
        this.topicExpression = topicExpression;
    }
    
    public void setMessageKeyExpression(Expression messageKeyExpression) {
        this.messageKeyExpression = messageKeyExpression;
    }
    
    public void setPartitionIdExpression(Expression partitionIdExpression) {
        this.partitionIdExpression = partitionIdExpression;
    }
    
    /**
     * Specify a SpEL expression to evaluate a timestamp that will be added in the Kafka record.
     * The resulting value should be a {@link Long} type representing epoch time in milliseconds.
     * @param timestampExpression the {@link Expression} for timestamp to wait for result
     * fo send operation.
     * @since 2.3
     */
    public void setTimestampExpression(Expression timestampExpression) {
        this.timestampExpression = timestampExpression;
    }
    

    等等。

    您还可以访问超类设置器,例如用于您的 XML 变体的 setSync()

    input-channel 不是MessageHandler 的责任。它转到Endpoint,可以通过@ServiceActivator@Bean 进行配置。

    在 Core Spring 集成参考手册中查看更多信息:https://docs.spring.io/spring-integration/reference/html/#annotations_on_beans

    开头还有很重要的一章:https://docs.spring.io/spring-integration/reference/html/#programming-tips

    此外,最好考虑使用 Java DSL 而不是直接使用MessageHandler

                 Kafka
                    .outboundChannelAdapter(producerFactory)
                    .sync(true)
                    .messageKey(m -> m
                            .getHeaders()
                            .get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                    .headerMapper(mapper())
                    .partitionId(m -> 0)
                    .topicExpression("headers[kafka_topic] ?: '" + topic + "'")
                    .configureKafkaTemplate(t -> t.id("kafkaTemplate:" + topic))
                    .get();
    

    在提到的 Spring 集成文档中查看有关 Java DSL 的更多信息:https://docs.spring.io/spring-integration/reference/html/#java-dsl

    【讨论】:

    • 谢谢阿特姆。我真的很感谢你的帮助。我会审查这些文件。我只想连接到 kafka 的“简单”连接,而且我必须学习两种嵌入在 java 中的新编程语言,并且对先验知识有未说明的假设。最后,我希望有几十行代码,但到了这一点非常令人沮丧。
    • 好吧,对于简单的用例,您可以考虑直接使用KafkaTemplateKafkaProducer。但是最后你会发现 Spring Integration 是构建复杂消息集成解决方案的真正有用的工具。
    • 我同意,它看起来是一个很好的(也是必要的)工具,否则我不会坚持这么远。不幸的是,这些文档读起来像功能规范而不是操作指南,因此除非您非常熟悉内部结构并计划将您职业生涯的大部分时间专门用于它,否则您无法知道如何将它们连接起来甚至找出如何将它们连接起来。我从 DSL 示例开始,然后逐渐回归到我可以理解的东西。 DSL 对熟悉它的人来说可能看起来很棒,但我发现它完全不可读。
    • get() 返回一个对象的实例,它将成为 Spring Container 中的一个 bean。之后还有更多的自动化工作由 Spring 完成。
    • 不知道如何从这里获得帮助,但您确实需要让自己熟悉所有 EIP (enterpriseintegrationpatterns.com) 首先掌握一袋原理和概念,然后来我们的 Java DSL 章节查看所有相关性:docs.spring.io/spring-integration/reference/html/#java-dslUsing Protocol Adapters 段落应该阐明什么是 namespace factory 以及如何使用它。
    猜你喜欢
    • 2018-07-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-11-08
    • 2015-07-22
    • 1970-01-01
    • 2015-05-30
    • 2011-01-15
    相关资源
    最近更新 更多