【问题标题】:Transactions / request-response-pattern in flow based/reactive programming基于流/反应式编程中的事务/请求-响应模式
【发布时间】:2019-11-07 06:24:09
【问题描述】:

所以我最近几天一直在阅读有关基于流的编程 (FBP) 的内容,并且我还一直在阅读 J. Paul Morrison 的有关它的书。但是,我觉得我仍然无法真正理解它。一般概念是您将编程视为某种装配线,其中您的组件将一些数据包作为输入并产生一些数据包作为输出。您可以连接这些组件并通过网络传输数据包。虽然我完全了​​解这如何适用于 ETL 类型的应用程序或批处理,但我不知道如何使用它处理同步请求/响应模式或数据库事务之类的事情。

例如,假设我有一个实现为 FPB 的 Web 服务器。这个网络服务器有一个 GET /user/{id} ,它应该返回一个带有一些用户信息的 JSON。它还有一个 POST /user/{id} ,您可以在其中通过将一些 JSON 发送回服务器来更新用户。所以这就是我想象的这个流程的样子:

我尝试拥有许多可重用的组件,而不是将处理请求的整个逻辑放入单个组件中。因此,有一个 HTTP 服务器组件将请求发送到调度程序组件,然后调度程序组件将请求分派到后续流中。在每个流程中,请求由通用“请求解析器”组件解析,该组件将请求的各个部分输出到流程的其余部分。

上半部分非常简单,我从 DB 中读取具有给定 ID 的用户实体,将对象序列化为 JSON,然后将其发回。然而,此时我们实际上并没有对 HTTP 请求的引用,那么我怎么知道将这个请求发送到哪里呢?

在下部,我们有一些额外的复杂性,因为我想以事务方式写入数据库。因此,首先启动一个事务(并行地将请求主体解析为某个对象),然后从数据库中检索用户对象并与来自请求的输入合并。最后,它被写回数据库并提交事务。最后,一些“OK”状态响应调用者。在这里,我还有一个额外的问题,即在提交事务时,我真的不知道要提交哪个事务。当然,在发送响应时,我不知道将其发送到哪个请求。

所以这两个问题似乎有一些共同点——一种跨越许多组件的“上下文”。在一个示例中,它是 HTTP 请求/响应上下文,而另一个示例是事务上下文。在常规编程中,这些上下文通常在线程级别处理。由于请求在单个线程中运行,因此事务和请求上下文绑定到线程本地,因此只要所有内容都在同一个线程中运行,它们就可以在任何地方访问。

在基于流的编程中,每个组件都独立运行,理想情况下是在单独的线程上运行。这实际上是一个关键的事情,因为它允许并行化和有效使用多个处理器。但是,当该线程本地上下文不再存在时,您如何在基于流的编程中处理这些问题?如果正确处理错误(我在示例中省略了),这将变得更加复杂。

我认为,当您进行反应式编程时,大部分处理都是异步和多线程的,您也会遇到同样的问题,所以我想知道是否有模式可以处理这个问题。您是否有反应式编程或基于流的编程的实际经验,并且对我如何解决这个问题有一些提示?

【问题讨论】:

    标签: multithreading language-agnostic reactive-programming flow-based-programming


    【解决方案1】:

    我在 Twitter 上写了一个快速的答案 - 我想我也会在这里发布...抱歉重复发布!

    我喜欢这个/这些问题的子流,其中子流中的第一个信息包提供了您正在谈论的“上下文”。这可能会有所帮助:https://github.com/jpaulm/javafbp-websockets... HTH!

    PS 这种循环式的网络拓扑也是 Facebook 新的“Flux”技术的基础——参见 Jing Chen 的演讲,她将这种方法与 MVC 进行了比较:https://www.youtube.com/watch?v=nYkdrAPrdcw

    【讨论】:

      【解决方案2】:

      希望这可以将您推向正确的方向。我有一个类似的问题,我需要在异步微服务架构中执行同步操作。

      我如何解决它是使用观察者模式。我有 3 个组件;一个 http 服务器、一个回调服务器和一个计时器轮。与您类似的 http 服务器接收传入的请求,回调服务器接收异步处理后的总体结果以及将原始 http 上下文排队并协调对 http 请求的响应的计时器轮。

      当接收到一个传入的请求时,http 服务器会创建一个关联 id,将其附加到请求元数据中,将回调服务器 url 附加到请求元数据中,最后将请求和原始 http 上下文一起添加到计时器轮中。然后,http 服务器会像您的情况一样将请求传递给调度程序,并将消息发送到相关组件以进行异步处理。

      根据当前处理组件的执行结果,它将从元数据中检索回调 url 并将响应发送到回调服务器。在您的情况下,有 json 序列化或数据库写入可以执行此操作。然后回调服务器将提取附加的相关ID并获取相应的http上下文并写入响应。

      注意计时器轮中的每个计时器对象都有一个可配置的超时时间,这样如果异步处理延迟,它将超时并向相应 http 上下文的 http 客户端返回一条可配置的消息。

      【讨论】:

        猜你喜欢
        • 2013-08-30
        • 2023-04-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2018-10-11
        • 1970-01-01
        • 2018-10-07
        • 2014-05-09
        相关资源
        最近更新 更多