【问题标题】:Webflux websocketclient, How to send multiple requests in same session[design client library]Webflux websocketclient,如何在同一个会话中发送多个请求[设计客户端库]
【发布时间】:2019-05-17 15:50:34
【问题描述】:

TL;DR;

我们正在尝试使用 spring webflux WebSocket 实现来设计一个 WebSocket 服务器。服务器具有通常的 HTTP 服务器操作,例如create/fetch/update/fetchall。使用 WebSockets,我们试图公开一个端点,以便客户端可以利用单个连接进行各种操作,因为 WebSockets 就是为此目的而设计的。使用 webflux 和 WebSockets 的设计是否正确?

加长版

我们正在启动一个项目,该项目将使用来自spring-webflux 的响应式 Web 套接字。我们需要构建一个响应式客户端库,消费者可以使用它来连接到服务器。

在服务器上,我们收到一个请求,读取一条消息,保存它并返回一个静态响应:

public Mono<Void> handle(WebSocketSession webSocketSession) {
    Flux<WebSocketMessage> response = webSocketSession.receive()
            .map(WebSocketMessage::retain)
            .concatMap(webSocketMessage -> Mono.just(webSocketMessage)
                    .map(parseBinaryToEvent) //logic to get domain object
                    .flatMap(e -> service.save(e))
                    .thenReturn(webSocketSession.textMessage(SAVE_SUCCESSFUL))
            );

    return webSocketSession.send(response);
}

在客户端,我们想在有人调用save方法时进行调用,并返回来自server的响应。

public Mono<String> save(Event message) {
    new ReactorNettyWebSocketClient().execute(uri, session -> {
      session
              .send(Mono.just(session.binaryMessage(formatEventToMessage)))
              .then(session.receive()
                      .map(WebSocketMessage::getPayloadAsText)
                      .doOnNext(System.out::println).then()); //how to return this to client
    });
    return null;
}

我们不确定如何设计这个。理想情况下,我们认为应该有

1) client.execute 应该只被调用一次并且以某种方式持有session。应使用同一会话在后续调用中发送数据。

2)我们在session.receive得到的服务器如何返回响应?

3) 如果fetchsession.receive 中的响应很大(不仅是静态字符串,而是事件列表),那又如何呢?

我们正在做一些研究,但我们无法在线找到合适的 webflux-websocket-client 文档/实施资源。有关如何前进的任何指示。

【问题讨论】:

    标签: java spring spring-webflux spring-websocket project-reactor


    【解决方案1】:

    不确定这种情况是否是您的问题? 我看到您正在发送静态通量响应(这是一个可关闭的流) 您需要一个打开的流来向该会话发送消息,例如您可以创建一个处理器

    public class SocketMessageComponent {
    private DirectProcessor<String> emitterProcessor;
    private Flux<String> subscriber;
    
    public SocketMessageComponent() {
        emitterProcessor = DirectProcessor.create();
        subscriber = emitterProcessor.share();
    }
    
    public Flux<String> getSubscriber() {
        return subscriber;
    }
    
    public void sendMessage(String mesage) {
        emitterProcessor.onNext(mesage);
    }
    

    }

    然后你就可以发送了

     public Mono<Void> handle(WebSocketSession webSocketSession) {
        this.webSocketSession = webSocketSession;
        return webSocketSession.send(socketMessageComponent.getSubscriber()
                .map(webSocketSession::textMessage))
                .and(webSocketSession.receive()
                        .map(WebSocketMessage::getPayloadAsText).log());
    }
    

    【讨论】:

    • 您好,感谢您的回复。我试过了,但是有没有办法区分服务器发送一个实体作为响应,例如fetchByIDfetchAll。您如何区分两者,特别是当您尝试将相同的连接/会话用于多个请求时。
    • 所以你需要发射不同类型的对象?
    • 嗨,很抱歉延迟回复。如问题中所述,我们正在尝试设计一个具有通常 http 服务器操作的 websocket 服务器,例如create/fetch/update。使用 websockets,我们试图公开一个端点,以便客户端可以利用单个连接进行各种操作,因为 websockets 就是为此目的。使用 webflux 和 websockets 的设计是否正确?
    • 我不认为 websocket 是最好的解决方案。套接字更多地用于实时数据。您正在为套接字提供 REST 工作,如果您想使用套接字,它将是一个 spageti 代码
    • 感谢您的回复。
    【解决方案2】:

    请!使用RSocket!

    这是绝对正确的设计,值得节省资源并为所有可能的操作只为每个客户端使用一个连接。

    但是,不要实现轮子并使用为您提供所有这些类型的通信的协议。

    • RSocket 有一个 request-response 模型,它允许您进行当今最常见的客户端-服务器交互。
    • RSocket 有一个 request-stream 通信模型,因此您可以满足您的所有需求并异步返回事件流并重用同一连接。 RSocket 将所有逻辑流映射到物理连接并返回,因此您不会感到自己这样做的痛苦。
    • RSocket 有更多的交互模型,例如 fire-and-forgetstream-stream 在以下情况下可能很有用 以两种方式发送数据流。

    如何在 Spring 中使用 RSocket

    其中一个选项是使用 RSocket 协议的 RSocket-Java 实现。 RSocket-Java 建立在 Project Reactor 之上,因此它自然适合 Spring WebFlux 生态系统。

    不幸的是,没有与 Spring 生态系统的特色集成。幸运的是,我花了几个小时提供了一个简单的RSocket Spring Boot Starter,它集成了 Spring WebFlux 和 RSocket,并公开了 WebSocket RSocket 服务器和 WebFlux Http 服务器。

    为什么 RSocket 是更好的方法?

    基本上,RSocket 隐藏了自己实现相同方法的复杂性。使用 RSocket,我们不必关心作为自定义协议和 Java 实现的交互模型定义。 RSocket 为我们将数据传递到特定的逻辑通道。它提供了一个内置客户端,可以将消息发送到同一个 WS 连接,因此我们不必为此发明自定义实现。

    使用RSocket-RPC 让它变得更好

    由于 RSocket 只是一个协议,它不提供任何消息格式,所以这个挑战是针对业务逻辑的。但是,有一个 RSocket-RPC 项目提供协议缓冲区作为消息格式,并重用与 GRPC 相同的代码生成技术。因此,使用 RSocket-RPC,我们可以轻松地为客户端和服务器构建一个 API,而无需关心传输和协议抽象。

    同样的 RSocket Spring Boot 集成也提供了 RSocket-RPC 使用的example

    好吧,这还没说服我,我还想要一个自定义的 WebSocket 服务器

    因此,为此,您必须自己实现该地狱。我之前已经做过一次,但我不能指出那个项目,因为它是一个企业项目。 不过,我可以分享一些代码示例,它们可以帮助您构建合适的客户端和服务器。

    服务器端

    处理程序和开放逻辑订阅者映射

    必须考虑的第一点是一个物理连接中的所有逻辑流都应该存储在某个地方:

    class MyWebSocketRouter implements WebSocketHandler {
    
      final Map<String, EnumMap<ActionMessage.Type, ChannelHandler>> channelsMapping;
    
    
      @Override
      public Mono<Void> handle(WebSocketSession session) {
        final Map<String, Disposable> channelsIdsToDisposableMap = new HashMap<>();
        ...
      }
    }
    

    上面的示例中有两张地图。第一个是您的路由映射,它允许您根据传入的消息参数来识别路由,等等。第二个是为请求流用例创建的(在我的情况下,它是活动订阅的映射),因此您可以发送一个创建订阅的消息帧,或者为您订阅特定操作并保留该订阅,以便一旦取消订阅执行操作,如果存在订阅,您将被取消订阅。

    使用处理器进行消息多路复用

    为了从所有逻辑流发回消息,您必须将消息多路复用到一个流。例如,使用 Reactor,您可以使用 UnicastProcessor

    @Override
    public Mono<Void> handle(WebSocketSession session) {
      final UnicastProcessor<ResponseMessage<?>> funIn = UnicastProcessor.create(Queues.<ResponseMessage<?>>unboundedMultiproducer().get());
      ...
    
      return Mono
        .subscriberContext()
        .flatMap(context -> Flux.merge(
          session
            .receive()
            ...
            .cast(ActionMessage.class)
            .publishOn(Schedulers.parallel())
            .doOnNext(am -> {
              switch (am.type) {
                case CREATE:
                case UPDATE:
                case CANCEL: {
                  ...
                }
                case SUBSCRIBE: {
                  Flux<ResponseMessage<?>> flux = Flux
                    .from(
                      channelsMapping.get(am.getChannelId())
                                     .get(ActionMessage.Type.SUBSCRIBE)
                                     .handle(am) // returns Publisher<>
                    );
    
                  if (flux != null) {
                    channelsIdsToDisposableMap.compute(
                      am.getChannelId() + am.getSymbol(), // you can generate a uniq uuid on the client side if needed
                      (cid, disposable) -> {
                        ...
    
                        return flux
                          .subscriberContext(context)
                          .subscribe(
                            funIn::onNext, // send message to a Processor manually
                            e -> {
                              funIn.onNext(
                                new ResponseMessage<>( // send errors as a messages to Processor here
                                  0,
                                  e.getMessage(),
                                  ...
                                  ResponseMessage.Type.ERROR
                                )
                              );
                            }
                          );
                      }
                    );
                  }
    
                  return;
                }
                case UNSABSCRIBE: {
                  Disposable disposable = channelsIdsToDisposableMap.get(am.getChannelId() + am.getSymbol());
    
                  if (disposable != null) {
                    disposable.dispose();
                  }
                }
              }
            })
            .then(Mono.empty()),
    
            funIn
                ...
                .map(p -> new WebSocketMessage(WebSocketMessage.Type.TEXT, p))
                .as(session::send)
          ).then()
        );
    }
    

    从上面的示例中我们可以看到,那里有很多东西:

    1. 消息应包含路线信息
    2. 消息应包含与其相关的唯一流 ID。
    3. 用于消息多路复用的单独处理器,其中错误也应该是消息
    4. 每个通道都应该存储在某个地方,在这种情况下,我们有一个简单的用例,每个消息可以提供一个Flux 消息或只是一个Mono(如果是单声道,它可以在服务器上更简单地实现侧,因此您不必保留唯一的流 ID)。
    5. 此示例不包括消息编码-解码,所以这个挑战留给您。

    客户端

    客户端也不是那么简单:

    处理会话

    为了处理连接,我们必须分配两个处理器,以便进一步使用它们来复用和解复用消息:

    UnicastProcessor<> outgoing = ...
    UnicastPorcessor<> incoming = ...
    (session) -> {
      return Flux.merge(
         session.receive()
                .subscribeWith(incoming)
                .then(Mono.empty()),
         session.send(outgoing)
      ).then();
    }
    

    将所有逻辑流保存在某处

    所有创建的流,无论是Mono 还是Flux,都应该存储在某个地方,以便我们能够区分与哪个流消息相关:

    Map<String, MonoSink> monoSinksMap = ...;
    Map<String, FluxSink> fluxSinksMap = ...;
    

    自MonoSink以来我们要保留两张map,而FluxSink没有相同的父接口。

    消息路由

    在上面的示例中,我们只考虑了客户端的初始部分。现在我们要构建一个消息路由机制:

    ...
    .subscribeWith(incoming)
    .doOnNext(message -> {
        if (monoSinkMap.containsKey(message.getStreamId())) {
            MonoSink sink = monoSinkMap.get(message.getStreamId());
            monoSinkMap.remove(message.getStreamId());
            if (message.getType() == SUCCESS) {
                sink.success(message.getData());
            }
            else {
                sink.error(message.getCause());
            }
        } else if (fluxSinkMap.containsKey(message.getStreamId())) {
            FluxSink sink = fluxSinkMap.get(message.getStreamId());
            if (message.getType() == NEXT) {
                sink.next(message.getData());
            }
            else if (message.getType() == COMPLETE) {
                fluxSinkMap.remove(message.getStreamId());
                sink.next(message.getData());
                sink.complete();
            }
            else {
                fluxSinkMap.remove(message.getStreamId());
                sink.error(message.getCause());
            }
        }
    })
    

    上面的代码示例展示了我们如何路由传入的消息。

    多路复用请求

    最后一部分是消息多路复用。为此,我们将介绍可能的发送者类 impl:

    class Sender {
        UnicastProcessor<> outgoing = ...
        UnicastPorcessor<> incoming = ...
    
        Map<String, MonoSink> monoSinksMap = ...;
        Map<String, FluxSink> fluxSinksMap = ...;
    
        public Sender () {
    

    //这里创建websocket连接,放上前面提到的代码 }

        Mono<R> sendForMono(T data) {
            //generate message with unique 
            return Mono.<R>create(sink -> {
                monoSinksMap.put(streamId, sink);
                outgoing.onNext(message); // send message to server only when subscribed to Mono
            });
        }
    
         Flux<R> sendForFlux(T data) {
             return Flux.<R>create(sink -> {
                fluxSinksMap.put(streamId, sink);
                outgoing.onNext(message); // send message to server only when subscribed to Flux
            });
         }
    }
    

    自定义实现总结

    1. 铁杆
    2. 没有实施背压支持,这可能是另一个挑战
    3. 很容易射中自己的脚

    要点

    1. 请使用 RSocket,不要自己发明协议,太难了!!!
    2. 要从 Pivotal 的人那里了解更多关于 RSocket 的信息 - https://www.youtube.com/watch?v=WVnAbv65uCU
    3. 从我的一次演讲中了解有关 RSocket 的更多信息 - https://www.youtube.com/watch?v=XKMyj6arY2A
    4. 在 RSocket 之上构建了一个名为 Proteus 的特色框架 - 您可能对此感兴趣 - https://www.netifi.com/
    5. 向 RSocket 协议核心开发者了解更多关于 Proteus 的信息 - https://www.google.com/url?sa=t&source=web&rct=j&url=https://m.youtube.com/watch%3Fv%3D_rqQtkIeNIQ&ved=2ahUKEwjpyLTpsLzfAhXDDiwKHUUUA8gQt9IBMAR6BAgNEB8&usg=AOvVaw0B_VdOj42gjr0YrzLLUX1E

    【讨论】:

    • 我刚开始使用 WebSockets,非常喜欢你的回答。你知道 STOMP,弹簧集成开箱即用吗?您如何将其与使用 RSocket 进行比较(如果这有任何意义的话)?
    • @RüdigerSchulz 最后一次 STOMP 更新(按照规范)是在 7 年前......对,它不是那么好。另一方面,Java 客户端/服务器的维护也已经过时。我建议看一下 RSocket。目前,Spring 团队正在完成 RSocket 和 Spring-Messaging 集成的最终确定,因此几个月后它将公开可用。欲了解更多信息->github.com/spring-projects/spring-framework/tree/master/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-04-07
    • 2023-01-26
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多