【发布时间】:2018-05-17 14:23:13
【问题描述】:
我正在使用 Spring Boot 1.5.13.RELEASE 和 Spring Integration 4.3.16.RELEASE 开发应用程序。
我对 Spring Integration 还很陌生,遇到了一个问题。
所以基本想法是,在一些外部触发器(可能是 HTTP 调用)上,我需要创建一个 IntegrationFlow,它将使用来自 rabbitMQ 队列的消息,使用它们做一些工作,然后(可能)生成另一个 rabbitMQ端点。
现在这种情况应该会发生很多次,所以我必须创建多个集成流。
我正在使用 IntegrationFlowContext 来注册每个 IntegrationFlow,如下所示:
IntegrationFlowContext flowContext;
...
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(uuid);
...
flowContext.registration(integrationFlow).id(callUUID).register();
我必须澄清这可以同时发生,同时创建多个集成流。
所以每次我尝试创建一个集成流时,我的“源”是一个看起来像这样的网关:
MessagingGatewaySupport sourceGateway = Amqp
.inboundGateway(rabbitTemplate.getConnectionFactory(), rabbitTemplate, dynamicQueuePrefix+uuid)
.concurrentConsumers(1)
.adviceChain(retryInterceptor)
.autoStartup(false)
.id("sgX-" + uuid)
.get();
它还不是@Bean,但我希望它在每个 IntegrationFlow 注册时都能注册。
我的“目标”是一个 AmqpOutBoundAdapter,如下所示:
@Bean
public AmqpOutboundEndpoint outboundAdapter(
RabbitTemplate rabbitTemplate,
ApplicationMessagingProperties applicationMessagingProperties
) {
return Amqp.outboundAdapter(rabbitTemplate)
.exchangeName("someStandardExchange")
.routingKeyExpression("headers.get('rabbitmq.ROUTING_KEY')")
.get();
}
现在这个 IS 已经是一个 bean,并且每次我尝试创建流时都会被注入。
我的流程看起来像这样:
public IntegrationFlow configure() {
return IntegrationFlows
.from(sourceGateway)
.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper))
.filter(injectedGenericSelectorFilter)
.<HashMap<String, String>>handle((payload, headers) -> {
String uuid = payload.get("uuid");
boolean shouldForwardMessage = myInjectedApplicationService.isForForwarding(payload);
myInjectedApplicationService.handlePayload(payload);
return MessageBuilder
.withPayload(payload)
.setHeader("shouldForward", shouldForwardMessage)
.setHeader("rabbitmq.ROUTING_KEY", uuid)
.build();
})
.filter("headers.get('shouldForward').equals(true)")
.transform(Transformers.toJson(jsonObjectMapper))
.handle(outboundAdapter)
.get();
}
我的问题是,当应用程序启动正常并创建第一个集成流等时。稍后,我会遇到这种异常:
java.lang.IllegalStateException: 无法在 bean 名称 'org.springframework.integration.transformer.MessageTransformingHandler#872' 下注册对象 [org.springframework.integration.transformer.MessageTransformingHandler#872]: 已经有对象 [org. springframework.integration.transformer.MessageTransformingHandler#872] 绑定
我什至尝试为每个使用的组件设置一个 id,它应该用作 beanName ,如下所示:
.transform(Transformers.fromJson(HashMap.class, jsonObjectMapper), tf -> tf.id("tf1-"+uuid))
但是,即使 .filter 等组件的 bean 名称问题得到了解决,我仍然会遇到关于 MessageTransformingHandler 的相同异常。
更新
我没有提到这样一个事实:一旦每个 IntegrationFlow 完成其工作,就会使用 IntegrationFlowContext 将其删除,如下所示:
flowContext.remove(flowId);
因此,似乎(某种)起作用的是通过使用相同的对象作为锁来同步 流注册 块和 流删除 块。 p>
所以我负责注册和删除流程的班级如下所示:
...
private final Object lockA = new Object();
...
public void appendNewFlow(String callUUID){
IntegrationFlow integrationFlow = myFlowFactory.makeFlow(callUUID);
synchronized (lockA) {
flowContext.registration(integrationFlow).id(callUUID).register();
}
}
public void removeFlow(String flowId){
synchronized (lockA) {
flowContext.remove(flowId);
}
}
...
我现在的问题是这种锁对应用程序来说有点重,因为我得到了很多:
...Waiting for workers to finish.
...
...Successfully waited for workers to finish.
这并没有我想的那么快。
但我猜这是意料之中的,因为每次线程获取锁时,注册流程及其所有组件或注销流程都需要一些时间及其所有组件。
【问题讨论】:
标签: java spring spring-integration spring-integration-dsl spring-integration-amqp