【问题标题】:Spring Integration - How to implement an asynchronous TCP Socket requests/responses over the same connection?Spring Integration - 如何通过同一连接实现异步 TCP Socket 请求/响应?
【发布时间】:2017-07-03 14:15:19
【问题描述】:

我有一个 Python TCP Socket 服务器服务:

  • 一次只允许一个客户端连接;
  • 其输入流/输出流独立运行。

另一方面,我有一个使用 Spring Integration 的 Java Spring Boot 客户端应用程序。我实际的 TCP 套接字配置器 实现用途:

@MessagingGateway(defaultRequestChannel = REQUEST_CHANNEL, errorChannel = ERROR_CHANNEL)
public interface ClientGtw {
    Future<Response> send(Request request);
}

@Bean
@ServiceActivator(inputChannel = REQUEST_CHANNEL)
public MessageHandler outboundGateway(TcpNioClientConnectionFactory connectionFactory) {
    TcpOutboundGateway gateway = new TcpOutboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
    gateway.setRemoteTimeout(TimeUnit.SECONDS.toMillis(timeout));
    return gateway;
}

@Bean
public TcpNioClientConnectionFactory clientConnectionFactory(AppConfig config) {    
    Host host = getHost(config);

    TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory(host.name, host.port);
    factory.setSingleUse(false);
    factory.setSoTimeout((int) TimeUnit.SECONDS.toMillis(timeout));

    SerializerDeserializer sd = new SerializerDeserializer();
    factory.setDeserializer(sd);
    factory.setSerializer(sd);
    return factory;
}

这种实际方法工作正常,但是,当发送请求时,它会挂起连接,直到收到响应。这是一个问题,因为有时一个请求可能获得太多时间来接收响应,并且系统有其他请求传入,其响应可以更快地实现。我想独立地发送和接收尽可能多的请求和响应(在它们之间解耦)。传输的对象(序列化和反序列化)包含可以进行正确关联的密钥对。

TL;DR:如何在同一连接上实现异步请求/响应?

Spring TcpOutboundGateway javadoc 提到:为该用例使用一对出站/入站适配器。

所以,除了上面的声明:

第一次尝试

@Bean
public TcpInboundGateway inboundGateway(AbstractServerConnectionFactory connectionFactory) {
    TcpInboundGateway gateway = new TcpInboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
    return gateway;
}

@Bean
public AbstractServerConnectionFactory serverFactory(AppConfig config) {
    Host host = getHost(config);
    AbstractServerConnectionFactory connectionFactory = new TcpNetServerConnectionFactory(host.port);
    connectionFactory.setSingleUse(true);
    connectionFactory.setSoTimeout(timeout);
    return connectionFactory;
}

请求被阻止,直到像以前一样传递响应。

第二次尝试

@Bean
public TcpInboundGateway inboundGateway(TcpNioClientConnectionFactory connectionFactory) {
    TcpInboundGateway gateway = new TcpInboundGateway();
    gateway.setConnectionFactory(connectionFactory);
    gateway.setRequestTimeout(TimeUnit.SECONDS.toMillis(timeout));
    gateway.setClientMode(true);
    return gateway;
}

org.springframework.integration.ip.tcp.connection.TcpNioClientConnectionFactory 只能被一个入站适配器使用

有什么线索吗?

【问题讨论】:

    标签: java spring sockets tcp spring-integration


    【解决方案1】:

    使用一对通道适配器而不是出站网关。除了使用MessagingGateway,您可以自己在应用程序中进行关联,或者您可以使用与tcp-client-server-multiplex sample app 中使用的相同技术。它使用聚合器将出站消息的副本与入站消息聚合,回复网关。

    它很旧,使用 XML 配置,但同样的技术也适用。

    <publish-subscribe-channel id="input" />
    
    <ip:tcp-outbound-channel-adapter id="outAdapter.client"
        order="2"
        channel="input"
        connection-factory="client" /> <!-- Collaborator -->
    
    <!-- Also send a copy to the custom aggregator for correlation and
         so this message's replyChannel will be transferred to the
         aggregated message.
         The order ensures this gets to the aggregator first -->
    <bridge input-channel="input" output-channel="toAggregator.client"
            order="1"/>
    
    <!-- Asynch receive reply -->
    <ip:tcp-inbound-channel-adapter id="inAdapter.client"
        channel="toAggregator.client"
        connection-factory="client" /> <!-- Collaborator -->
    
    <!-- dataType attribute invokes the conversion service, if necessary -->
    <channel id="toAggregator.client" datatype="java.lang.String" />
    
    <aggregator input-channel="toAggregator.client"
        output-channel="toTransformer.client"
        expire-groups-upon-completion="true"
        expire-groups-upon-timeout="true"
        discard-channel="noResponseChannel"
        group-timeout="1000"
        correlation-strategy-expression="payload.substring(0,3)"
        release-strategy-expression="size() == 2" />
    
    <channel id="noResponseChannel" />
    
    <service-activator input-channel="noResponseChannel" ref="echoService" method="noResponse" />
    
    <transformer input-channel="toTransformer.client"
        expression="payload.get(1)"/> <!-- The response is always second -->
    

    (这个简单的示例与前 3 个字节相关)。

    【讨论】:

    • 谢谢加里。我会处理您的建议并提供反馈。
    【解决方案2】:

    加里,感谢您的指导。

    要解决这个问题,首先了解Messaging Channel 类型很重要。

    所以,在配置器类中:

    @Bean(name = REQUEST_CHANNEL)
    public DirectChannel sender() {
        return new DirectChannel();
    }
    
    @Bean(name = RESPONSE_CHANNEL)
    public PollableChannel receiver() {
        return new QueueChannel();
    }
    
    @Bean
    @ServiceActivator(inputChannel = REQUEST_CHANNEL)
    public TcpSendingMessageHandler outboundClient(TcpNioClientConnectionFactory connectionFactory) {
        TcpSendingMessageHandler outbound = new TcpSendingMessageHandler();
        outbound.setConnectionFactory(connectionFactory);
        outbound.setRetryInterval(TimeUnit.SECONDS.toMillis(timeout));
        outbound.setClientMode(true);
        return outbound;
    }
    
    @Bean
    public TcpReceivingChannelAdapter inboundClient(TcpNioClientConnectionFactory connectionFactory) {
        TcpReceivingChannelAdapter inbound = new TcpReceivingChannelAdapter();
        inbound.setConnectionFactory(connectionFactory);
        inbound.setRetryInterval(TimeUnit.SECONDS.toMillis(timeout));
        inbound.setOutputChannel(receiver());
        inbound.setClientMode(true);
        return inbound;
    }
    

    这个scratch @Singleton 类说明了如何操作请求和响应(考虑到请求和响应包含一个用于关联它们的 UID):

    @Autowired
    private DirectChannel sender;
    
    @Autowired
    private PollableChannel receiver;
    
    private BlockingQueue<Request> requestPool = new LinkedBlockingQueue<>();
    
    private Map<String, Response> responsePool = Collections.synchronizedMap(new HashMap<>());
    
    @PostConstruct
    private void init() {
        new Receiver().start();
        new Sender().start();
    }
    
    /*
     * It can be called as many as necessary without hanging for a response
     */
    public void send(Request req) {
        requestPool.add(req);
    }
    
    /*
     * Check for a response until a socket timout
     */
    public Response receive(String key) {
        Response res = responsePool.get(key);
        if (res != null) {
            responsePool.remove(key);
        }
        return res;
    }
    
    private class Receiver extends Thread {
        @Override
        public void run() {
            while (true) {
                try {
                    tcpReceive();
                    Thread.sleep(250);
                } catch (InterruptedException e) { }
            }
        }
        private void tcpReceive() {
            Response res = (Message<Response>) receiver.receive();
            if (res != null) {
                responsePool.put(res.getUID(), res);
            }
        }
    }
    
    private class Sender extends Thread {
        @Override
        public void run() {
            while (true) {
                try {
                    tcpSend();
                    Thread.sleep(250);
                } catch (InterruptedException e) { }
            }
        }
        private void tcpSend() {
            Request req = requestPool.poll(125, TimeUnit.MILLISECONDS);
            if (req != null) {
                sender.send(MessageBuilder.withPayload(req).build());
            }
        }
    }
    

    更新

    我忘了提这个:

    @Bean
    public TcpNioClientConnectionFactory clientConnectionFactory(Config config) {
        // Get host properties
        Host host = getHost(config);
        // Create socket factory
        TcpNioClientConnectionFactory factory = new TcpNioClientConnectionFactory(host.name, host.port);
        factory.setSingleUse(false); // IMPORTANT FOR SINGLE CHANNEL
        factory.setSoTimeout((int) TimeUnit.SECONDS.toMillis(timeout));
        return factory;
    }
    

    请随意考虑。

    【讨论】:

    • 您能否提供一个工作示例,通过 spring 集成使用基于注释的 spring 配置创建 TCP 服务器套接字。我被困住了,你会拯救我的一天。 :-)
    猜你喜欢
    • 1970-01-01
    • 2021-11-20
    • 1970-01-01
    • 1970-01-01
    • 2020-08-12
    • 2016-02-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多