【问题标题】:How to resume session with RSocket when RSocket server failedRSocket 服务器失败时如何恢复与 RSocket 的会话
【发布时间】:2020-01-28 00:40:56
【问题描述】:

好的,我是 RSocket 的新手。我正在尝试创建一个简单的 RSocket 客户端和简单的 RSocket 服务器。从我所做的研究来看,RSocket 支持恢复:

它特别有用,因为当发送包含有关最后接收帧的信息的 RESUME 帧时,客户端能够恢复连接并仅请求它尚未收到的数据,从而避免服务器上不必要的负载和浪费尝试检索已检索的数据的时间。

它还说客户端是负责启用恢复的人。我的问题是如何启用此恢复以及如何发送该 RESUME 帧。我有功能正常的客户端和服务器,但是如果我关闭服务器并重新启动它,什么都不会发生,稍后当客户端再次尝试与服务器通信时,它会抛出:java.nio.channels.ClosedChannelException。

这是我的客户端配置:

@Configuration
public class ClientConfiguration {

/**
 * Defining the RSocket client to use tcp transport on port 7000
 */
@Bean
public RSocket rSocket() {
    return RSocketFactory
            .connect()
            .resumeSessionDuration(Duration.ofDays(10))
            .mimeType(MimeTypeUtils.APPLICATION_JSON_VALUE, MimeTypeUtils.APPLICATION_JSON_VALUE)
            .frameDecoder(PayloadDecoder.ZERO_COPY)
            .transport(TcpClientTransport.create(7000))
            .start()
            .block();
}

/**
 * RSocketRequester bean which is a wrapper around RSocket
 * and it is used to communicate with the RSocket server
 */
@Bean
RSocketRequester rSocketRequester(RSocketStrategies rSocketStrategies) {
    return RSocketRequester.wrap(rSocket(), MimeTypeUtils.APPLICATION_JSON, MimeTypeUtils.APPLICATION_JSON, rSocketStrategies);
}

}

这是一个 RestController,我从它开始与 rsocket 服务器通信:

@RestController
public class UserDataRestController {

private final RSocketRequester rSocketRequester;

public UserDataRestController(RSocketRequester.Builder rSocketRequester) {
    this.rSocketRequester = rSocketRequester.connectTcp("localhost", 7000).block();
}

@GetMapping(value = "/feed/{firstName}", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Publisher<Person> feed(@PathVariable("firstName") String firstName) {
    return rSocketRequester
            .route("feedPersonData")
            .data(new PersonDataRequest(firstName))
            .retrieveFlux(Person.class);
}

}

【问题讨论】:

    标签: java spring-boot rsocket


    【解决方案1】:

    由于会话存储在内存中,服务器重新启动后您无法恢复。见io.rsocket.resume.SessionManager#sessions

    但是,如果您重新连接到同一台服务器,您仍然可以保护自己免受网络问题的影响。而且您不必发送 RESUME 帧,客户端会为您完成。

    你应该配置服务器:

    @Bean
    ServerRSocketFactoryProcessor serverRSocketFactoryProcessor() {
        return RSocketFactory.ServerRSocketFactory::resume;
    }
    

    还有客户io.rsocket.RSocketFactory.ClientRSocketFactory#resume

    你可以找到几乎完整的例子here

    【讨论】:

    • 这行得通!非常感谢。即使在服务器重新启动后,客户端也能够与服务器通信。
    【解决方案2】:

    @Alexander Pankin 提供的代码现已弃用。我用这段代码来配置恢复一个服务器:

        @Bean
        RSocketServerCustomizer rSocketResume() {
            Resume resume =
                    new Resume()
                            .sessionDuration(Duration.ofMinutes(15))
                            .retry(
                                    Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(5))
                                            .doBeforeRetry(s -> log.debug("Disconnected. Trying to resume...")));
            return rSocketServer -> rSocketServer.resume(resume);
        }
    

    【讨论】:

    • 我认为重试只适用于客户端。
    猜你喜欢
    • 1970-01-01
    • 2021-05-15
    • 2022-10-24
    • 1970-01-01
    • 2020-09-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-10
    相关资源
    最近更新 更多