【问题标题】:How to implement a keep alive connection on a TCP connection using spring integration?如何使用spring集成在TCP连接上实现保持连接?
【发布时间】:2018-04-05 16:59:30
【问题描述】:

我有一个使用 spring 集成 TCP 构建的 TCP 客户端,服务器支持保持活动消息(ping/pong 样式)。连接是使用CachingClientConnectionFactory 配置的,我想利用此服务器功能。这是我的 bean 配置:

private static final int SERIALIZER_HEADER_SIZE = 2;

/**
 * Serializer used by connection factory to send and receive messages
 */
@Bean
public ByteArrayLengthHeaderSerializer byteArrayLengthHeaderSerializer() {
    return new ByteArrayLengthHeaderSerializer(SERIALIZER_HEADER_SIZE);
}

@Bean
public AbstractClientConnectionFactory tcpClientConnectionFactory() {
    TcpNetClientConnectionFactory connFactory =
        new TcpNetClientConnectionFactory(props.getUrl(), props.getPort());
    connFactory.setSerializer(byteArrayLengthHeaderSerializer());
    connFactory.setDeserializer(byteArrayLengthHeaderSerializer());
    connFactory.setSoTimeout(props.getSoTimeout());
    if (props.isUseSSL()) {
        connFactory.setTcpSocketFactorySupport(new DefaultTcpNetSSLSocketFactorySupport(() -> {
            return SSLContext.getDefault();
        }));
    }

    return connFactory;
}

/**
 * Connection factory used to create TCP client socket connections
 */
@Bean
public AbstractClientConnectionFactory tcpCachedClientConnectionFactory() {
    CachingClientConnectionFactory cachingConnFactory =
        new CachingClientConnectionFactory(tcpClientConnectionFactory(), props.getMaxPoolSize());
    cachingConnFactory.setConnectionWaitTimeout(props.getMaxPoolWait());
    return cachingConnFactory;
}

使用此处发布的解决方案Configure keep alive to keep connection alive all the time 我可以保持连接打开,但我也想利用这些服务器保持活动消息并不时发送这些消息以检查连接是否仍然存在。这可以提高客户端的性能,因为如果套接字关闭,它不需要重新连接/创建新连接。

基于此,是否有人对如何使用 spring 集成来实现这一点提出建议?

【问题讨论】:

  • 不清楚你的意思。您可以将soKeepAlive 设置为true,这样操作系统将通过发送ping 来保持套接字打开。然后,如果您不设置soTimeout,则套接字将无限期保持打开状态。
  • 服务器会收到类似KEEP_ALIVE_REQUEST 的信息,并会发回KEEP_ALIVE_RESPONSE。我的问题是关于使用它来保持连接打开,但根据您的回复 soKeepAlivesoTimeout 一起可以解决问题。
  • 更新:@GaryRussell 服务器将在 30 秒不活动后关闭套接字。基于此,spring 集成是否有任何功能可以用来发送那些特定的保持活动消息(以后台方式),以便可以重用客户端套接字?

标签: java spring sockets tcp spring-integration


【解决方案1】:

使用简单的客户端连接工厂时,很容易使用@InboundChannelAdapter 设置应用程序级心跳消息。

简单示例:

@SpringBootApplication
public class So46918267Application {

    public static void main(String[] args) throws IOException {
        // Simulated Server
        final ServerSocket server = ServerSocketFactory.getDefault().createServerSocket(1234);
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(() -> {
            try {
                Socket socket = server.accept();
                BufferedReader reader = new BufferedReader(new InputStreamReader(socket.getInputStream()));
                String line;
                while ((line = reader.readLine()) != null) {
                    System.out.println(line);
                    if (line.equals("keep_alive")) {
                        socket.getOutputStream().write("OK\r\n".getBytes());
                    }
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
        });
        ConfigurableApplicationContext context = SpringApplication.run(So46918267Application.class, args);
        System.out.println("Hit enter to terminate");
        System.in.read();
        executor.shutdownNow();
        context.close();
        server.close();
    }

    @Bean
    public TcpNetClientConnectionFactory client() {
        return new TcpNetClientConnectionFactory("localhost", 1234);
    }

    @ServiceActivator(inputChannel = "toTcp")
    @Bean
    public TcpOutboundGateway gateway() {
        TcpOutboundGateway gateway = new TcpOutboundGateway();
        gateway.setConnectionFactory(client());
        return gateway;
    }

    // HEARTBEATS

    private final Message<?> heartbeatMessage = MessageBuilder.withPayload("keep_alive")
            .setReplyChannelName("heartbeatReplies")
            .build();

    @InboundChannelAdapter(channel = "toTcp", poller = @Poller(fixedDelay = "25000"))
    public Message<?> heartbeat() {
        return this.heartbeatMessage;
    }

    @ServiceActivator(inputChannel = "heartbeatReplies")
    public void reply(byte[] reply) {
        System.out.println(new String(reply));
    }

}

但是,当使用CachingClientConnectionFactory 时,不清楚为什么要保持空闲连接池处于打开状态。但是,池的工作方式是将空闲连接保留在队列中,因此每个请求都将转到最旧的连接,并将连接返回到队列的末尾。

添加maxMessagesPerPoll 将在每次轮询时发出该数量的消息,并且...

@InboundChannelAdapter(channel = "toTcp", 
    poller = @Poller(fixedDelay = "25000", maxMessagesPerPoll = "5"))

最多可保持 5 个连接处于打开状态。它不会打开新连接(如果至少有一个),但如果池包含 5 个或更多连接,则至少 5 个将保持打开状态。如果没有打开的连接,它只会打开一个。

【讨论】:

  • 非常感谢您的快速回复,这正是我想要的。每次客户端必须打开一个新连接时,都需要一些时间,因此我们希望在池中保持一些打开的连接。
猜你喜欢
  • 2017-10-02
  • 1970-01-01
  • 1970-01-01
  • 2016-06-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-12-12
  • 1970-01-01
相关资源
最近更新 更多