【问题标题】:Request/Response socket with spring integration具有弹簧集成的请求/响应套接字
【发布时间】:2017-08-30 15:21:12
【问题描述】:

我有一个 spring boot 服务器应用程序,我需要在其中创建一个 ssl 套接字并让客户端连接到它。 我希望能够向客户端发送一条消息(例如,在向我的服务器发出 REST API 请求时)并读取它发回的响应。

到目前为止,我设法做的是:

  1. 创建套接字并允许客户端连接(并保存来自 TcpEvent 的连接 ID)
  2. 根据请求在套接字上发送消息
  3. 客户端收到请求并返回响应

在这一点上 - 我无法阅读他们的回复(我确实看到他们使用 Wireshark 发送回复)。即使我配置了一个TcpReceivingChannelAdapter,它使用与TcpSendingMessageHandler相同的连接工厂。

从那时起 - 我可以发送请求,但客户端不会收到它们...我怀疑这是因为我没有处理他的第一个响应(我无法访问客户端代码来验证那个)。

这是我的代码:

Config.java

@EnableIntegration
@IntegrationComponentScan
@Configuration
public class SocketConfiguration implements ApplicationListener<TcpConnectionEvent> {

   private static final org.slf4j.Logger log = LoggerFactory.getLogger("SocketConfiguration");

   @Bean
   public AbstractServerConnectionFactory AbstractServerConnectionFactory() {
      TcpNetServerConnectionFactory tcpNetServerConnectionFactory = new TcpNetServerConnectionFactory(40003);
      DefaultTcpNetSSLSocketFactorySupport tcpNetSSLSocketFactory = tcpSocketFactorySupport();
      tcpNetServerConnectionFactory.setTcpSocketFactorySupport(tcpNetSSLSocketFactory);
      return tcpNetServerConnectionFactory;
   }


   @Bean
   public DefaultTcpNetSSLSocketFactorySupport tcpSocketFactorySupport() {
      DefaultTcpSSLContextSupport sslContextSupport = new DefaultTcpSSLContextSupport("keystore.jks",
            "trustStore.jks", "123456", "123456");
      sslContextSupport.setProtocol("TLSv1.2");
      DefaultTcpNetSSLSocketFactorySupport tcpSocketFactorySupport = new DefaultTcpNetSSLSocketFactorySupport(sslContextSupport);
      return tcpSocketFactorySupport;
   }

   @Bean
   public static MessageChannel getResponseChannel() {
      DirectChannel directChannel = new DirectChannel();
      directChannel.setComponentName("getResponseChannel");
      directChannel.setLoggingEnabled(true);
      return directChannel;
   }

   @Bean
   public static MessageChannel getInputMessageChannel() {
      DirectChannel directChannel = new DirectChannel();
      directChannel.setComponentName("inputMessageChannel");
      directChannel.setLoggingEnabled(true);
      return directChannel;
   }

   @Bean
   public MessageChannel invokeChannel() {
      return new DirectChannel();
   }

   @Bean  
   public TcpReceivingChannelAdapter in(AbstractServerConnectionFactory connectionFactory) {
      TcpReceivingChannelAdapter adapter = new TcpReceivingChannelAdapter();
      adapter.setOutputChannel(getInputMessageChannel());
      adapter.setConnectionFactory(connectionFactory);
      adapter.setSendTimeout(5000);
      return adapter;
   }

   @ServiceActivator(inputChannel="toClientChannel")
   @Bean
   public TcpSendingMessageHandler out(AbstractServerConnectionFactory connectionFactory) {
      TcpSendingMessageHandler tcpSendingMessageHandler = new TcpSendingMessageHandler();
      tcpSendingMessageHandler.setConnectionFactory(connectionFactory);
      tcpSendingMessageHandler.setLoggingEnabled(true);
      return tcpSendingMessageHandler;
   }

   @Transformer(inputChannel = "invokeChannel", outputChannel = "toClientChannel")
   public Message<String> headerBeforeSend(String message) throws Exception {
      log.debug("send message to socket: {}", message);
      Map.Entry<String, TcpConnection> connectionEntry = GetConnectionEntry();
      log.debug("connection id is: {}", connectionEntry.getKey());
      return MessageBuilder.withPayload(message)
         .setHeader(IpHeaders.CONNECTION_ID,connectionEntry.getKey())
         .build();
   }

   private static ConcurrentHashMap<String, TcpConnection> tcpConnections = new ConcurrentHashMap<>();

   @Override
   public void onApplicationEvent(TcpConnectionEvent tcpEvent) {
      TcpConnection source = (TcpConnection) tcpEvent.getSource();
      if (tcpEvent instanceof TcpConnectionOpenEvent) {
         log.info("Socket Opened " + source.getConnectionId());
         tcpConnections.put(tcpEvent.getConnectionId(), source);
      } else if (tcpEvent instanceof TcpConnectionCloseEvent) {
         log.info("Socket Closed " + source.getConnectionId());
         if(tcpConnections.containsKey(source.getConnectionId()))
            tcpConnections.remove(source.getConnectionId());
      } else if (tcpEvent instanceof TcpConnectionExceptionEvent) {
         log.error("Error {}",tcpEvent.getCause().getMessage());
         if(tcpConnections.containsKey(source.getConnectionId()))
            tcpConnections.remove(source.getConnectionId());
      }
   }
}

Controller.java

@RestController
@ControllerAdvice
@RequestMapping("/socket")
public class SocketController {
   private final Logger log = LoggerFactory.getLogger(getClass());

   @Inject
   MessageChannel invokeChannel;

   @LogAspect
   @PostMapping
   public ResponseEntity sendMessage(@RequestBody SendMessageRequest request) throws Exception {
      log.debug("Message is {}",request.get_message());
      String msg = "Some test message";
      MessagingTemplate template = new MessagingTemplate();
      template.send(invokeChannel, new GenericMessage<>(msg));
      return new ResponseEntity(HttpStatus.OK);
   } 
}

您能否检查我的配置并指导我进行正确的设置?

谢谢

【问题讨论】:

    标签: java spring-integration tcpsocket


    【解决方案1】:

    您应该确保发送和接收的消息已正确序列化和反序列化。默认情况下它是ByteArrayCrlfSerializer,它基于\r\n 消息终止符:https://docs.spring.io/spring-integration/docs/4.3.11.RELEASE/reference/html/ip.html#connection-factories

    【讨论】:

    • 感谢您的回复所以你说我的配置是正确的,我只是没有正确解析响应?它不会显示有关它的错误或其他一些信息吗?
    • 这似乎很奇怪,因为我的请求和响应都是 JSON 格式,并且我能够正确发送我的请求(即在客户端收到)
    • 我们看不到任何错误,因为您的侦听器可能只是等待正确的包终止符。您应该咨询客户的期望。另一个想法是您的客户端可能会收到一条消息并关闭套接字。我不能说你的配置是否正确:我只是相信你的话,它部分有效。不正确的包终止符是 TCP 通信解决方案中的典型错误
    • 我通过 JSON 请求发送并接收 JSON 响应... ByteArrayCrlfSerializer 可以吗?
    • 这不相关。您必须知道您的客户对终结者的期望。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-10-15
    • 2019-03-19
    • 2017-10-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-04-08
    相关资源
    最近更新 更多