【问题标题】:Stateful Rsocket Application有状态的 Rsocket 应用程序
【发布时间】:2019-05-03 11:15:31
【问题描述】:

在我的项目中,我想让多个客户端连接到一个服务。我正在使用 java Rsocket 实现。

服务应该为每个客户端维护一个状态。现在,我可以通过一些标识符来管理客户端。这个选项我已经实现了。但我不想使用字符串手动管理会话。

所以另一个想法是通过 Rsocket 连接来识别客户端。有没有办法使用 Rsocket 通道来识别特定的客户端?

想象一个示例服务和几个客户端。每个客户端都有 Rsocket 通道,服务启动并运行。有没有办法使用 Rsocket 通道在服务器端识别这些客户端?如果您可以展示这种行为的编程示例,那将是惊人的。 谢谢!

编辑(更详细地描述案例)

这是我的例子。

我们目前使用了三个 CORBA 对象,如图所示:

  • Lo​​ginObject(通过 NamingService 检索到的引用)。客户端可以调用 login() 方法获取会话
  • Session 对象有多种方法可以查询有关当前服务上下文的详细信息,最重要的是获取 Transaction 对象
  • Transaction 对象可用于通过将 commandName 和键值对列表作为参数的通用方法执行各种命令。 客户端执行 n 个命令后,他可以提交或回滚事务(也可以通过 Transaction 对象上的方法)。

所以这里我们使用会话对象在我们的服务上执行事务。

现在我们决定从 CORBA 迁移到 Rsocket。因此,我们需要 Rsocket 微服务能够存储会话的状态,否则我们无法知道将要提交或回滚什么。这可以通过每个客户的单个发布者来完成吗?

【问题讨论】:

  • 不清楚您要做什么,大多数情况下不建议在 Web 服务器中保存状态 + 如果您正在使用通道,您可以为每个客户端创建 Publisher,那么为什么要这样做你需要状态吗?
  • @matanper 我用更详尽的描述编辑了这个问题:)
  • 我推荐你看看akka.io 这是一个非常强大的响应式框架。

标签: java reactive-programming rsocket


【解决方案1】:

这是我前几天制作的一个示例,它将使用 Netifi 的代理创建一个有状态的 RSocket: https://github.com/netifi/netifi-stateful-socket

不幸的是,您需要在本地构建我们的开发分支来试用 (https://github.com/netifi/netifi-java) - 如果您不想在本地构建它,那么应该在本周末之前发布包含代码的版本。

我也在研究一个纯 RSocket 示例,但是如果您想了解它会如何查看示例中的 StatefulSocket。它应该为您提供如何使用纯 RSocket 处理会话的线索。

关于您关于事务管理器的其他问题 - 您需要将您的事务与正在发出的 Reactive Streams 信号联系起来 - 如果您收到取消,您会回滚 onError,如果收到 onComplete,您会提交事务。 Flux/Mono 有一些副作用方法应该可以轻松处理。根据您的操作,您还可以使用 BaseSubscriber,因为它具有处理不同 Reactive Streams 信号的钩子。

谢谢, 罗伯特

【讨论】:

    【解决方案2】:

    一个恢复连接的例子,即维护服务器上的状态,已经登陆 rsocket-java repo

    https://github.com/rsocket/rsocket-java/commit/d47629147dd1a4d41c7c8d5af3d80838e01d3ba5

    恢复整个连接,包括与每个单独通道相关的任何状态等。

    有一个rsocket-cli 项目可以让您尝试一下。启动和停止socat进程,观察客户端和服务器的进度。

    $ socat -d TCP-LISTEN:5001,fork,reuseaddr TCP:localhost:5000
    
    $ ./rsocket-cli --debug --resume --server -i cli:time tcp://localhost:5000
    
    $ ./rsocket-cli -i client --stream --resume tcp://localhost:5001
    

    【讨论】:

    • 嗯,现在提交一个问题,服务器如何管理状态?我在那里看不到任何状态......
    • 这取决于你想要什么状态。它可以是任何一个 a) 流本质上是有状态的,例如计数器递增,b) 流具有状态运算符,c) 您使用项目反应器上下文,d) 您在应用程序中使用有状态的东西,例如您使用会话 id 键入的单例静态 Map。
    【解决方案3】:

    从您的描述看来,channel 效果最好,我之前没有使用过频道,所以我不能保证(抱歉)。但我建议你尝试这样的事情:

    事务控制器:

    public class TransactionController implements Publisher<Payload> {
    
        List<Transaction> transcations = new ArrayList<>();
    
        @Override
        public void subscribe(Subscriber<? super Payload> subscriber) {
    
        }
    
        public void processPayload(Payload payload) {
            // handle transcations...
        }
    }
    

    并且在您的RSocket 实现中覆盖requestChannel

    @Override
    public Flux<Payload> requestChannel(Publisher<Payload> payloads) {
        // Create new controller for each channel
        TranscationController cntrl = new TranscationController();
        Flux.from(payloads)
          .subscribe(cntrl::processPayload);
        return Flux.from(cntrl);
    }
    

    【讨论】:

    • 嘿,我对 Rsocket 完全陌生,所以我可能会问一些业余的问题...假设我创建了一个服务,它实现了 protobuf 生成的服务。该服务定义了一个方法:public Mono&lt;TransactionResult&gt; transaction(Publisher&lt;Command&gt; messages, ByteBuf metadata),它派生自 protobuf 的 rpc transaction (stream Command) returns (TransactionResult) {},应该使用 Mono 处理客户端请求流,无论事务是提交还是取消。
    • 此时,我需要将 TransactionController 逻辑添加到服务的:Mono&lt;TransactionResult&gt; transaction(Publisher&lt;Command&gt; messages, ByteBuf metadata) 方法中,就像您在 requestChannel-example 中所做的那样。我说的对吗?
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2011-02-18
    • 1970-01-01
    • 1970-01-01
    • 2011-06-19
    • 1970-01-01
    • 1970-01-01
    • 2021-11-08
    相关资源
    最近更新 更多