【发布时间】:2020-07-02 17:43:06
【问题描述】:
您好,我是 Spring 集成的新手
我检查了 Spring Integration 动态路由的示例。终于在这里找到了
这里有行
@Component
@MessagingGateway(defaultRequestChannel = "toTcp.input")
public interface TcpClientGateway {
byte[] send(String data, @Header("host") String host, @Header("port") int port);
}
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
Assert.state(host != null && port != null, "host and/or port header missing");
String hostPort = host + port;
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(cf);
IntegrationFlow flow = f -> f.handle(handler);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
但是我改了
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
Assert.state(host != null && port != null, "host and/or port header missing");
String hostPort = host + port;
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
cf.setLeaveOpen(true);
//cf.setSingleUse(true);
ByteArrayCrLfSerializer byteArrayCrLfSerializer =new ByteArrayCrLfSerializer();
byteArrayCrLfSerializer.setMaxMessageSize(1048576);
cf.setSerializer(byteArrayCrLfSerializer);
cf.setDeserializer(byteArrayCrLfSerializer);
TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
tcpOutboundGateway.setConnectionFactory(cf);
IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
使用请求/响应架构。它真的很好用,因为它提供了动态路由,而无需手动创建 tcp 客户端。
此时我需要一些帮助来改进我的方案。我的场景是这样的;
客户端向服务器发送一条消息并从服务器接收该消息的响应,但随后服务器需要向该客户端发送任意消息(类似于 GPS 位置更新信息)。当服务器开始向客户端发送这些消息时,会生成如下错误消息
错误 54816 --- [pool-2-thread-1] o.s.i.ip.tcp.TcpOutboundGateway : 无法关联响应 - ::58628:62fd67b6-af2d-42f1-9c4d-d232fbe9c8ca 没有待处理的回复
我检查了 spring 集成文档并注意到网关只与请求/响应一起工作,所以我知道我应该使用适配器,但我不知道如何将适配器与动态 tcp 客户端一起使用。
在这里我找到了类似的主题和一些回复,但无法达到我的目标或找到组合解决方案的示例。
Spring Integration TCP Spring integration TCP server push to client
【问题讨论】:
标签: spring tcp spring-integration