【发布时间】:2018-11-28 12:33:33
【问题描述】:
我有消耗 org.springframework.web.reactive.socket.WebSocketMessage 的 SI 流,并对其进行了一些处理,其中包括使用 Netty 的 ByteBuf 处理它的有效负载。在某些时候,我的流程中发生了异常:
之后,处理所有二进制 Web 套接字消息失败,并出现以下异常:
2018-11-26T10:38:29,133 错误 --- [-server-epoll-7] o.s.i.h.LoggingHandler (:) org.springframework.messaging.MessageDeliveryException:无法将消息发送到通道“binaryWebSocketMessageChannel”;嵌套异常是 java.lang.IllegalStateException: The [binaryWebSocketMessageChannel] 没有订阅者接受消息,failedMessage=GenericMessage [payload=MyPayload(payload=org.springframework.web.reactive.socket.WebSocketMessage@38552d5, session=ReactorNettyWebSocketSession[ id=3e0be929, uri=http://localhost:8080/]), headers={id=b09a89ff-f7be-1b43-6f62-40e5c0b5695a, timestamp=1543225109132}] 在 org.springframework.integration.support.utils.IntegrationUtils.wrapInDeliveryExceptionIfNecessary(IntegrationUtils.java:163) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:475) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) 在 org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:183) 在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) 在 org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:205) 在 org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:55) 在 org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:138) 在 org.springframework.integration.endpoint.ReactiveStreamsConsumer$1.hookOnNext(ReactiveStreamsConsumer.java:127) 在 reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:158) 在 reactor.core.publisher.FluxRetry$RetrySubscriber.onNext(FluxRetry.java:79) ... 原因:java.lang.IllegalStateException:[binaryWebSocketMessageChannel] 没有订阅者接受消息 在 org.springframework.util.Assert.state(Assert.java:94) 在 org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63) 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ... 57 更多谁能指出我任何方向来尝试解决问题? 另外,SI EIP 组件(路由器、转换器、过滤器、服务激活器)在哪些情况下会取消订阅频道?
供参考,频道类型为org.springframework.integration.channel.FluxMessageChannel
编辑:
我的流程如下所示:
WebSocketMessage -> router: (BINARY) -> binaryWebSocketMessageChannel -> ...
(!BINARY) -> nullChannel
(我知道过滤器更适合这里,我打算稍后重构)
@ArtemBilan 带有示例的 repo 在这里:https://github.com/ioreskovic/Spring-Integration-flow-loses-subscriber
【问题讨论】:
-
我们需要一些简单的项目来玩和重现。您提供的有关配置的信息太少。谢谢
-
我将尝试在本周末之前提取这些部分并将其托管在 GitHub 上。谢谢你:)
-
@ArtemBilan 我用 repo、重现步骤和日志更新了问题
-
你的应用看起来不错,但审查起来没那么简单:自定义代码太多。复制的先决条件也太复杂了。就不能在项目中想出junit test,把问题隔离在一个地方吗?我没有单独的机器来运行其他一些软件。
-
没问题,明天第一件事。我刚刚发布了这个例子,因为那些导致这个错误的原始步骤
标签: java spring websocket spring-integration reactor-netty