【问题标题】:Spring Integration Async Gateway with error-channel causes threads to park带有错误通道的 Spring Integration Async Gateway 导致线程停止
【发布时间】:2014-12-12 20:25:40
【问题描述】:

我有一个异步网关,其中定义了一个异步执行器和一个错误通道。还有一个处理异常的服务激活器。

<int:gateway id="myGateway"
             service-interface="me.myGateway"
             default-request-channel="requestChannel"
             async-executor="taskScheduler"
             error-channel="errorChannel"/>

<int:service-activator ref="errorChannelMessageHandler" method="handleFailedInvocation"
                       input-channel="errorChannel"/>
public interface MyGateway {
    Future<Object> send(Message message);
}

@Component
public class ErrorChannelMessageHandler {
    @ServiceActivator
    public Object handleFailedInvocation(MessagingException exception) {
        // do some stuff here
        return null;
    }
}

当抛出异常时,线程阻塞,等待网关创建的临时回复通道上的回复。

如何防止线程阻塞以及何时引发异常以让错误通道服务激活器处理消息并执行一些自定义逻辑?

后期编辑: 我正在使用 SI 3.0.3.RELEASE 当为网关定义错误通道时,如果发生异常,它将调用此行(来自 MessagingGatewaySupport 的没有 250)errorFlowReply = this.messagingTemplate.sendAndReceive(this.errorChannel, errorMessage),它将执行在新创建的临时回复通道上阻止接收(来自 MessagingTemplate 的第 352-367 行)。我的错误通道的服务激活器是否负责在这个新创建的临时通道上发布消息,因此线程不会阻塞接收?

【问题讨论】:

    标签: spring-integration asynchronous


    【解决方案1】:

    删除错误通道,以便将原始异常添加到Future,以便get() 传播它。

    如果您想对错误流执行某些操作,请从错误流中抛出异常,以便在 Future 中可用,或将 reply-timeout 设置为 0。但是,您的 Future 永远不会在那个案例。

    【讨论】:

    • 长话短说:我的 SI 链连接到 RabbitMQ 并发布消息。当 rabbit 连接断开时,将抛出 AmqpConnectException。在这种情况下,我想做的是获取消息并将它们保存在某个地方(数据库,本地文件)。这是错误通道有用的地方,因为在服务激活器中我将执行此操作。如果我删除错误通道,我该怎么做? Future 上的 get() 可能并不总是被调用,因为有时我想要一个答案,有时我只是一劳永逸。
    • 还有一个问题:如果我删除了错误通道,那么如果我不能正确使用它,那么为网关设置这个属性有什么意义呢?或者我在这里遗漏了一些东西。谢谢
    • 您可以使用错误通道,但错误流必须返回结果或抛出异常(或者如我所说,将reply-timeout 设置为0)。除非您有进一步的异步切换,否则它不会影响“良好”流 - 回复超时仅在线程返回网关时开始。
    • MessagingTemplate 仍将创建一个临时回复通道,并将阻止接收。如果没有回复超时,我看不到这将如何工作。错误流如何返回结果或抛出异常?
    • 这个怎么样? @ Component public class ErrorChannelMessageHandler { @ ServiceActivator public void handleFailedInvocation(Message&lt;MessagingException&gt; message) { //persist the message.getPayload().getFailedMessage(); MessageChannel replyChannel = (MessageChannel) message.getHeaders().get("replyChannel"); CustomResponse customResponse = new CustomResponse(...); replyChannel.send(MessageBuilder.withPayload(customResponse).build()); } }我向网关发送响应,未来总会返回响应,我可以自定义响应
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-07-07
    • 1970-01-01
    • 2019-07-20
    • 2015-02-14
    • 1970-01-01
    • 2017-04-23
    • 1970-01-01
    相关资源
    最近更新 更多