【问题标题】:How to support WebSocket transport with Spring Boot RSocket server?如何使用 Spring Boot RSocket 服务器支持 WebSocket 传输?
【发布时间】:2020-01-10 19:32:15
【问题描述】:

TLDR:配置一个公开支持 WebSocket 传输的 RSocket 接口的 Spring Boot 应用程序需要什么?


我同时学习RSocket和Spring Boot,请多多包涵。

在我的努力中,我已经能够构建一个非常简单和人为的 Spring Boot 应用程序实现,它使用第二个 Spring Boot 应用程序提供/公开的 API,使用 RSocket 作为协议,但是,我只能使用TcpClientTransport 时实现这一点。

从我的角度来看,WebsocketTransport 更有可能被使用并且对于客户端->服务器架构更有用,但是,我还没有找到任何关于如何正确配置 Spring Boot 应用程序的工作示例或文档接受使用 WebSocket 作为传输的 RSocket 消息。

奇怪的是,在我的测试中,我的消费者(客户端)似乎确实建立了与服务器/生产者的 WebSocket 连接,但是,“握手”似乎挂起并且连接从未完全建立。我已经使用 JavaScript 库(rsocket-websocket-client、rsocket-rpc-core 等)和 Java 库(io.rsocket.transport.netty.client.WebsocketClientTransport)进行了测试,服务器似乎表现出相同行为不管。

重申一下,使用 TCPTransport 我可以很好地连接到服务器并调用请求,但是当使用 WebsocketTransport 时,连接永远不会建立。

一个旨在通过 WebsocketClientTransport 支持 RSocket 的 Spring Boot 应用程序需要什么,过去使用 spring-boot-starter-rsocket 作为依赖项?。

服务器


pom.xml

...

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.0.M5</version>
    <relativePath/> <!-- lookup parent from repository -->
</parent>

...

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-rsocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

...

application.properties

spring.rsocket.server.port=8081
management.endpoints.enabled-by-default=true
management.endpoints.web.exposure.include=*

SpringBootRSocketServerApplication.java

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringBootRSocketServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringBootRSocketServerApplication.class, args);
    }
}

用户RSocketController

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import java.time.Duration;
import java.util.List;
import java.util.Random;
import java.util.stream.Stream;

@Slf4j
@Controller
public class UserRSocketController {

    @Autowired
    private UserRepository userRepository;

    @MessageMapping("usersList")
    public Mono<List<User>> usersList() {
        log.info("Handling usersList request.");
        return Mono.just(this.userRepository.getUsers());
    }

    @MessageMapping("usersStream")
    Flux<User> usersStream(UserStreamRequest request) {
        log.info("Handling request for usersStream.");
        List<User> users = userRepository.getUsers();
        Stream<User> userStream = Stream.generate(() -> {
            Random rand = new Random();
            return users.get(rand.nextInt(users.size()));
        });
        return Flux.fromStream(userStream).delayElements(Duration.ofSeconds(1));
    }

    @MessageMapping("userById")
    public Mono<User> userById(GetUserByIdRequest request) {
        log.info("Handling request for userById id: {}.", request.getId());
        return Mono.just(this.userRepository.getUserById(request.getId()));
    };
}

启动记录

 :: Spring Boot ::             (v2.2.0.M5)

2019-09-08 21:40:02,986 INFO  [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRSocketServerApplication on REDACTED with PID 22540 (REDACTED)
2019-09-08 21:40:02,988 INFO  [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default
2019-09-08 21:40:04,103 INFO  [main] org.springframework.boot.actuate.endpoint.web.EndpointLinksResolver: Exposing 14 endpoint(s) beneath base path '/actuator'
2019-09-08 21:40:04,475 INFO  [main] org.springframework.boot.rsocket.netty.NettyRSocketServer: Netty RSocket started on port(s): 8081
2019-09-08 21:40:04,494 INFO  [main] org.springframework.boot.web.embedded.netty.NettyWebServer: Netty started on port(s): 8080
2019-09-08 21:40:04,498 INFO  [main] org.springframework.boot.StartupInfoLogger: Started SpringBootRSocketServerApplication in 1.807 seconds (JVM running for 2.883)

消费者/客户


ClientConfiguration.java

import io.rsocket.RSocket;
import io.rsocket.RSocketFactory;
import io.rsocket.frame.decoder.PayloadDecoder;
import io.rsocket.transport.ClientTransport;
//import io.rsocket.transport.netty.client.TcpClientTransport;
import io.rsocket.transport.netty.client.WebsocketClientTransport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.rsocket.MetadataExtractor;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.util.MimeTypeUtils;

@Configuration
public class ClientConfiguration {

    @Bean
    public RSocket rSocket() {
        // ClientTransport transport = TcpClientTransport.create(8081);
        // ^--- TCPTransport works fine

        ClientTransport transport = WebsocketClientTransport.create(8081);
        // ^--- Connection hangs and application startup stalls

        return RSocketFactory
            .connect()
            .mimeType(MetadataExtractor.ROUTING.toString(), MimeTypeUtils.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(transport)
            .start()
            .block();
    }

    @Bean
    RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
        return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
    }
}

启动记录

 :: Spring Boot ::             (v2.2.0.M5)

2019-09-08 21:40:52,331 INFO  [main] org.springframework.boot.StartupInfoLogger: Starting SpringBootRsocketConsumerApplication on REDACTED with PID 18904 (REDACTED)
2019-09-08 21:40:52,334 INFO  [main] org.springframework.boot.SpringApplication: No active profile set, falling back to default profiles: default

【问题讨论】:

    标签: java spring-boot spring-mvc websocket rsocket


    【解决方案1】:

    您只需要两件事就可以让 RSocket 应用程序使用 websocket 传输公开端点:

    首先,您需要 webflux 和 rsocket 依赖项,因为您可能还需要提供网页和静态资源:

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>
    

    然后你需要在你的application.properties文件中相应地配置RSocket服务器:

    #server.port=8080 this is already the default
    spring.rsocket.server.transport=websocket
    spring.rsocket.server.mapping-path=/rsocket
    

    您可以在Spring Boot reference documentation about RSocket 中找到更多相关信息。

    websocket 客户端现在可以连接到ws://localhost:8080/rsocket

    请注意,从当前的 2.2.0 SNAPSHOT 开始,RSocket 协议已经发展,rsocket-js 库正在迎头赶上,尤其是在元数据支持方面。你会找到a working sample here as well

    在 Java 客户端方面,Spring Boot 为您提供了一个 RSocketRequester.Builder,该RSocketRequester.Builder 已经使用编解码器和拦截器根据您的需求进行了配置和定制:

    @Component
    public class MyService {
    
        private final RSocketRequester rsocketRequester;
    
        public MyService(RSocketRequester.Builder builder) {
            this.rsocketRequester = builder
                    .connectWebSocket(URI.create("ws://localhost:8080/rsocket"))
                    .block();
        }
    }
    

    【讨论】:

      【解决方案2】:

      根据this blog post,要连接的正确端口是通过server.port=8080 配置的端口。

      服务器配置

      server.port=8080
      spring.rsocket.server.port=8081
      spring.rsocket.server.mapping-path=/ws
      spring.rsocket.server.transport=websocket
      

      Java 消费者客户端配置

      import io.rsocket.RSocket;
      import io.rsocket.RSocketFactory;
      import io.rsocket.frame.decoder.PayloadDecoder;
      import io.rsocket.transport.ClientTransport;
      import io.rsocket.transport.netty.client.TcpClientTransport;
      import io.rsocket.transport.netty.client.WebsocketClientTransport;
      import org.springframework.context.annotation.Bean;
      import org.springframework.context.annotation.Configuration;
      import org.springframework.messaging.rsocket.MetadataExtractor;
      import org.springframework.messaging.rsocket.RSocketRequester;
      import org.springframework.messaging.rsocket.RSocketStrategies;
      import org.springframework.util.MimeTypeUtils;
      
      import java.net.URI;
      import java.time.Duration;
      
      @Configuration
      public class ClientConfiguration {
      
          @Bean
          public RSocket rSocket() {
              URI websocketUri = URI.create("ws://127.0.0.1:8080/ws");
              WebsocketClientTransport ws = WebsocketClientTransport.create(websocketUri);
              return RSocketFactory
                  .connect()
                  .mimeType(
                      MetadataExtractor.ROUTING.toString(),
                      MimeTypeUtils.APPLICATION_JSON_VALUE)
                  .frameDecoder(PayloadDecoder.ZERO_COPY)
                  .transport(ws)
                  .start()
                  .block();
          }
      
          @Bean
          RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
              return RSocketRequester.wrap(
                  rSocket(),
                  MimeTypeUtils.APPLICATION_JSON,
                  MetadataExtractor.ROUTING,
                  rSocketStrategies);
          }
      }
      

      JavaScript 客户端配置

      import { RSocketClient, JsonSerializers } from 'rsocket-core';
      import RSocketWebSocketClient from 'rsocket-websocket-client';
      
      const transport = new RSocketWebSocketClient({
          url: 'ws://127.0.0.1:8080/ws'
      });
      
      const client = new RSocketClient({
          // send/receive JSON objects instead of strings/buffers
          serializers: JsonSerializers,
          setup: {
              // ms btw sending keepalive to server
              keepAlive: 60000,
      
              // ms timeout if no keepalive response
              lifetime: 180000,
      
              // format of `data`
              dataMimeType: 'application/json',
      
              // format of `metadata`
              metadataMimeType: 'application/json',
          },
          transport,
      });
      
      client.connect().then((rsocket) => {
          // work with rsocket
      });
      

      【讨论】:

        猜你喜欢
        • 2018-07-05
        • 2021-02-24
        • 1970-01-01
        • 1970-01-01
        • 2014-10-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-09-19
        相关资源
        最近更新 更多