【问题标题】:Spring Integration flow loses subscriberSpring Integration 流程失去了订阅者
【发布时间】:2018-11-28 12:33:33
【问题描述】:

我有消耗 org.springframework.web.reactive.socket.WebSocketMessage 的 SI 流,并对其进行了一些处理,其中包括使用 Netty 的 ByteBuf 处理它的有效负载。在某些时候,我的流程中发生了异常:

org.springframework.messaging.MessageHandlingException:消息处理程序[_org.springframework.integration.errorLogger.handler]中发生错误;嵌套异常是 io.netty.util.IllegalReferenceCountException: refCnt: 0 在 org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:184) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] 在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:175) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] 在 org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:224) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] 在 org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:180) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] 在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] 在 org.springframework.integration.channel.MessagePublishingErrorHandler.handleError(MessagePublishingErrorHandler.java:93) ~[spring-integration-core-5.0.5.RELEASE.jar!/:5.0.5.RELEASE] ... 引起:io.netty.util.IllegalReferenceCountException:refCnt:0 在 io.netty.buffer.AbstractByteBuf.ensureAccessible(AbstractByteBuf.java:1417) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] 在 io.netty.buffer.AbstractByteBuf.checkIndex(AbstractByteBuf.java:1356) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] 在 io.netty.buffer.AbstractByteBuf.getInt(AbstractByteBuf.java:417) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] 在 io.netty.buffer.ByteBufUtil.hashCode(ByteBufUtil.java:175) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] 在 io.netty.buffer.AbstractByteBuf.hashCode(AbstractByteBuf.java:1315) ~[netty-buffer-4.1.24.Final.jar!/:4.1.24.Final] 在 org.springframework.core.io.buffer.NettyDataBuffer.hashCode(NettyDataBuffer.java:288) ~[spring-core-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] 在 org.springframework.web.reactive.socket.WebSocketMessage.hashCode(WebSocketMessage.java:134) ~[spring-webflux-5.0.6.RELEASE.jar!/:5.0.6.RELEASE] 在 java.lang.Object.toString(Object.java:236) ~[?:1.8.0_161] 在 java.lang.String.valueOf(String.java:2994) ~[?:1.8.0_161] 在 java.lang.StringBuilder.append(StringBuilder.java:131) ~[?:1.8.0_161]

之后,处理所有二进制 Web 套接字消息失败,并出现以下异常:

2018-11-26T10:38:29,133 错误 --- [-server-epoll-7] o.s.i.h.LoggingHandler (:) org.springframework.messaging.MessageDeliveryException:无法将消息发送到通道“binaryWebSocketMessageChannel”;嵌套异常是 java.lang.IllegalStateException: The [binaryWebSocketMessageChannel] 没有订阅者接受消息,failedMessage=GenericMessage [payload=MyPayload(payload=org.springframework.web.reactive.socket.WebSocketMessage@38552d5, session=ReactorNettyWebSocketSession[ id=3e0be929, uri=http://localhost:8080/]), headers={id=b09a89ff-f7be-1b43-6f62-40e5c0b5695a, timestamp=1543225109132}] 在 org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) 在 org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:183) 在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) 在 org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:205) 在 org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:55) 在 org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:138) 在 org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:127) 在 reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158) 在 reactor.core.publisher.FluxRetry$RetrySubscriber.onNext(FluxRetry.java:79) ... 原因:java.lang.IllegalStateException:[binaryWebSocketMessageChannel] 没有订阅者接受消息 在 org.springframework.util.Assert.state(Assert.java:94) 在 org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ... 57 更多

谁能指出我任何方向来尝试解决问题? 另外,SI EIP 组件(路由器、转换器、过滤器、服务激活器)在哪些情况下会取消订阅频道?

供参考,频道类型为org.springframework.integration.channel.FluxMessageChannel

编辑:

我的流程如下所示:

WebSocketMessage -> router: (BINARY)  -> binaryWebSocketMessageChannel -> ...
                            (!BINARY) -> nullChannel

(我知道过滤器更适合这里,我打算稍后重构)

@ArtemBilan 带有示例的 repo 在这里:https://github.com/ioreskovic/Spring-Integration-flow-loses-subscriber

【问题讨论】:

  • 我们需要一些简单的项目来玩和重现。您提供的有关配置的信息太少。谢谢
  • 我将尝试在本周末之前提取这些部分并将其托管在 GitHub 上。谢谢你:)
  • @ArtemBilan 我用 repo、重现步骤和日志更新了问题
  • 你的应用看起来不错,但审查起来没那么简单:自定义代码太多。复制的先决条件也太复杂了。就不能在项目中想出junit test,把问题隔离在一个地方吗?我没有单独的机器来运行其他一些软件。
  • 没问题,明天第一件事。我刚刚发布了这个例子,因为那些导致这个错误的原始步骤

标签: java spring websocket spring-integration reactor-netty


【解决方案1】:

关键是FluxMessageChannel中的Publisher在那个Spring Integration版本中被取消了。

我们开始在5.1 版本中使用来自Reactor 3.2onErrorContinue()。要解决您的问题,最好考虑将您的应用程序升级到最新的 Spring Boot 2.1.1

作为我们的解决方法,您可以考虑在 BinaryWsmToBytesTransformer 中吞下一个异常,并且不要将其冒泡到 FluxMessageChannel 后面。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-06-22
    • 2017-05-05
    • 2021-01-31
    • 1970-01-01
    • 2020-09-09
    • 2016-12-31
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多