【问题标题】:Spring integration Java DSL - Dynamically create IntegrationFlowsSpring 集成 Java DSL - 动态创建 IntegrationFlows
【发布时间】:2018-05-17 14:23:13
【问题描述】:

我正在使用 Spring Boot 1.5.13.RELEASE 和 Spring Integration 4.3.16.RELEASE 开发应用程序。

我对 Spring Integration 还很陌生,遇到了一个问题。

所以基本想法是,在一些外部触发器(可能是 HTTP 调用)上,我需要创建一个 IntegrationFlow,它将使用来自 rabbitMQ 队列的消息,使用它们做一些工作,然后(可能)生成另一个 rabbitMQ端点。

现在这种情况应该会发生很多次,所以我必须创建多个集成流。

我正在使用 IntegrationFlowContext 来注册每个 IntegrationFlow,如下所示:

IntegrationFlowContext flowContext;
...
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(uuid);
...
flowContext.registration(integrationFlow).id(callUUID).register();

我必须澄清这可以同时发生,同时创建多个集成流。

所以每次我尝试创建一个集成流时,我的“源”是一个看起来像这样的网关:

MessagingGatewaySupport sourceGateway = Amqp
        .inboundGateway(rabbitTemplate.getConnectionFactory(), rabbitTemplate, dynamicQueuePrefix+uuid)
        .concurrentConsumers(1)
        .adviceChain(retryInterceptor)
        .autoStartup(false)
        .id("sgX-" + uuid)
        .get();

它还不是@Bean,但我希望它在每个 IntegrationFlow 注册时都能注册。

我的“目标”是一个 AmqpOutBoundAdapter,如下所示:

@Bean
public AmqpOutboundEndpoint outboundAdapter(
        RabbitTemplate rabbitTemplate,
        ApplicationMessagingProperties applicationMessagingProperties
) {
    return Amqp.outboundAdapter(rabbitTemplate)
            .exchangeName("someStandardExchange")
            .routingKeyExpression("headers.get('rabbitmq.ROUTING_KEY')")
            .get();
}

现在这个 IS 已经是一个 bean,并且每次我尝试创建流时都会被注入。

我的流程看起来像这样:

public IntegrationFlow configure() {
    return IntegrationFlows
            .from(sourceGateway)
            .transform(Transformers.fromJson(HashMap.class, jsonObjectMapper))
            .filter(injectedGenericSelectorFilter)
            .<HashMap<String, String>>handle((payload, headers) -> {

                String uuid = payload.get("uuid");

                boolean shouldForwardMessage = myInjectedApplicationService.isForForwarding(payload);
                myInjectedApplicationService.handlePayload(payload);

                return MessageBuilder
                        .withPayload(payload)
                        .setHeader("shouldForward", shouldForwardMessage)
                        .setHeader("rabbitmq.ROUTING_KEY", uuid)
                        .build();
            })
            .filter("headers.get('shouldForward').equals(true)")
            .transform(Transformers.toJson(jsonObjectMapper))
            .handle(outboundAdapter)
            .get();
}

我的问题是,当应用程序启动正常并创建第一个集成流等时。稍后,我会遇到这种异常:

java.lang.IllegalStateException: 无法在 bean 名称 'org.springframework.integration.transformer.MessageTransformingHandler#872' 下注册对象 [org.springframework.integration.transformer.MessageTransformingHandler#872]: 已经有对象 [org. springframework.integration.transformer.MessageTransformingHandler#872] 绑定

我什至尝试为每个使用的组件设置一个 id,它应该用作 beanName ,如下所示:

.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper), tf -> tf.id("tf1-"+uuid))

但是,即使 .filter 等组件的 bean 名称问题得到了解决,我仍然会遇到关于 MessageTransformingHandler 的相同异常。


更新

我没有提到这样一个事实:一旦每个 IntegrationFlow 完成其工作,就会使用 IntegrationFlowContext 将其删除,如下所示:

flowContext.remove(flowId);

因此,似乎(某种)起作用的是通过使用相同的对象作为锁来同步 流注册 块和 流删除 块。 p>

所以我负责注册和删除流程的班级如下所示:

...
private final Object lockA = new Object();
...

public void appendNewFlow(String callUUID){
    IntegrationFlow integrationFlow = myFlowFactory.makeFlow(callUUID);

    synchronized (lockA) {
        flowContext.registration(integrationFlow).id(callUUID).register();
    }
}

public void removeFlow(String flowId){

    synchronized (lockA) {
        flowContext.remove(flowId); 
    }

}
...

我现在的问题是这种锁对应用程序来说有点重,因为我得到了很多:

...Waiting for workers to finish.
...
...Successfully waited for workers to finish.

这并没有我想的那么快。

但我猜这是意料之中的,因为每次线程获取锁时,注册流程及其所有组件或注销流程都需要一些时间及其所有组件。

【问题讨论】:

    标签: java spring spring-integration spring-integration-dsl spring-integration-amqp


    【解决方案1】:

    你也有这个:

    .transform(Transformers.toJson(jsonObjectMapper))
    

    如果你也在那里添加.id(),它会如何工作?

    另一方面,既然你说这是同时发生的,我想知道你是否可以制作一些代码synchonized,例如包装flowContext.registration(integrationFlow).id(callUUID).register();

    bean 定义和注册过程实际上不是线程安全的,并且只能从应用程序生命周期开始时初始化线程的那一个开始使用。

    我们可能确实需要在其register(IntegrationFlowRegistrationBuilder builder) 函数中或至少在其registerBean(Object bean, String beanName, String parentName) 中将IntegrationFlowContext 作为线程安全的,因为这正是我们生成bean 名称并注册它的地方。

    请随时就此事提出 JIRA。

    不幸的是,Spring Integration Java DSL 扩展项目已经不受支持,我们只能对当前的5.x 一代添加修复。尽管如此,我相信synchonized 的解决方法应该在这里工作,因此无需将其反向移植到 Spring Integration Java DSL 扩展中。

    【讨论】:

    • 感谢您的回复!我已经尝试将.id() 添加到包括其他变压器在内的所有组件中,但无济于事。我还尝试在负责注册流程的方法中使用synchronized,但这也不起作用(令我惊讶的是)。也许我错过了一些东西。会再玩一些,如果我找到解决方案,一定会回信。
    • 也许你可以分享更多关于这件事的堆栈跟踪?我的意思是在你修复了 id 和 synchronized
    • 当然!我明天会这样做,因为我目前无法这样做!
    • @Alexreve,解决方法在这里:github.com/spring-projects/spring-integration/pull/2448。没有那么简单,因此我看不出我们如何向后移植它。让您的代码在这件事上同步会很棒。
    • 太棒了!虽然我不确定我们是否可以立即迁移到版本 5.x.x。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-02-03
    • 2020-07-11
    相关资源
    最近更新 更多