【问题标题】:Spring Integration Gateway VS AdaptersSpring 集成网关 VS 适配器
【发布时间】:2020-07-02 17:43:06
【问题描述】:

您好,我是 Spring 集成的新手

我检查了 Spring Integration 动态路由的示例。终于在这里找到了

Dynamic TCP Client

这里有行

@Component
@MessagingGateway(defaultRequestChannel = "toTcp.input")
public interface TcpClientGateway {
    byte[] send(String data, @Header("host") String host, @Header("port") int port);
}

private MessageChannel createNewSubflow(Message<?> message) {
        String host = (String) message.getHeaders().get("host");
        Integer port = (Integer) message.getHeaders().get("port");
        Assert.state(host != null && port != null, "host and/or port header missing");
        String hostPort = host + port;

        TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
        TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
        handler.setConnectionFactory(cf);
        IntegrationFlow flow = f -> f.handle(handler);
        IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
                this.flowContext.registration(flow)
                        .addBean(cf)
                        .id(hostPort + ".flow")
                        .register();
        MessageChannel inputChannel = flowRegistration.getInputChannel();
        this.subFlows.put(hostPort, inputChannel);
        return inputChannel;
    }

但是我改了

private MessageChannel createNewSubflow(Message<?> message) {
    String host = (String) message.getHeaders().get("host");
    Integer port = (Integer) message.getHeaders().get("port");
    Assert.state(host != null && port != null, "host and/or port header missing");
    String hostPort = host + port;

    TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
    cf.setLeaveOpen(true);
    //cf.setSingleUse(true);

    ByteArrayCrLfSerializer  byteArrayCrLfSerializer =new ByteArrayCrLfSerializer();
    byteArrayCrLfSerializer.setMaxMessageSize(1048576);

    cf.setSerializer(byteArrayCrLfSerializer);
    cf.setDeserializer(byteArrayCrLfSerializer);

    TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
    tcpOutboundGateway.setConnectionFactory(cf);

    IntegrationFlow flow = f -> f.handle(tcpOutboundGateway);

    IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
        this.flowContext.registration(flow)
            .addBean(cf)
            .id(hostPort + ".flow")
            .register();
    MessageChannel inputChannel = flowRegistration.getInputChannel();

    this.subFlows.put(hostPort, inputChannel);
    return inputChannel;
}

使用请求/响应架构。它真的很好用,因为它提供了动态路由,而无需手动创建 tcp 客户端。

此时我需要一些帮助来改进我的方案。我的场景是这样的;

客户端向服务器发送一条消息并从服务器接收该消息的响应,但随后服务器需要向该客户端发送任意消息(类似于 GPS 位置更新信息)。当服务器开始向客户端发送这些消息时,会生成如下错误消息

错误 54816 --- [pool-2-thread-1] o.s.i.ip.tcp.TcpOutboundGateway : 无法关联响应 - ::58628:62fd67b6-af2d-42f1-9c4d-d232fbe9c8ca 没有待处理的回复

我检查了 spring 集成文档并注意到网关只与请求/响应一起工作,所以我知道我应该使用适配器,但我不知道如何将适配器与动态 tcp 客户端一起使用。

在这里我找到了类似的主题和一些回复,但无法达到我的目标或找到组合解决方案的示例。

Spring Integration TCP Spring integration TCP server push to client

【问题讨论】:

    标签: spring tcp spring-integration


    【解决方案1】:

    您只需要注册两个流程;一个用于输入;一个用于输出 - 问题在于将回复的响应关联起来,并将任意消息路由到网关以外的某个地方。

    我更新了此用例 on this branch 的示例。

    您可以在该分支的最后一次提交中看到更改;大多数更改是为了模拟您的服务器端。

    在客户端,我们只需注册两个流,并使用@ServiceActivator 方法获取入站消息;您可以通过连接 ID 识别它们来自哪个服务器。

    【讨论】:

    • 非常感谢 Gary,我期待您的支持
    • 我更新了我的答案,其中包含一个指向我更新示例的分支的链接。
    • 让我从 github 检查并尝试将您的解决方案应用于我的场景。我将在完成演示后立即提供详细的反馈。 BR,再次感谢您。
    • 嗨,加里,我继续并应用你的建议,现在我似乎可以接收任意消息,但我对 @ServiceActivator(inputChannel = "fromTcp") 的用法有点困惑,因为以前有 byte[]接口中的 send(...) 方法,因此响应本身返回给调用者,但现在我只有一种方法,称为 @ServiceActivator(inputChannel = "fromTcp") receiver (...) 所以我无法将结果消息返回给来电服务。我不确定我是否可以表达。
    • 嗨@GaryRussell我的意思是我在ServiceA中有两行byte[] result = toTCP.send(data1, host1, portX),在ServiceB中有byte[] result = toTCP.send(data2, host2, portY),将来可能会有其他的toTCP.send在另一个ServiceX中......但现在的情况是稍有改变,ServiceA 和 ServiceB 中仍有 toTcp.send 块,但我有一个 @ServiceActivator(inputChannel = "fromTcp") ...receiver(...),所以我无法关联。
    猜你喜欢
    • 2018-10-26
    • 1970-01-01
    • 1970-01-01
    • 2015-01-19
    • 1970-01-01
    • 1970-01-01
    • 2020-12-15
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多