【问题标题】:Akka Reactive Streams purpose of the ProcessorAkka Reactive Streams 处理器的用途
【发布时间】:2016-02-01 23:24:47
【问题描述】:

我正在尝试理解 akka 中的 Reactive Streams。我已经阅读了这个博客http://bryangilbert.com/blog/2015/02/04/akka-reactive-streams/,我想我对它的工作原理有了基本的了解。然而,我不明白的是处理器在这个概念中的目的。它是干什么用的? 订阅者请求 N-Objects 和发布者使用 onNext() 发送它们还不够吗?

【问题讨论】:

    标签: stream akka akka-stream reactive-streams


    【解决方案1】:

    假设您有一个真正简单的流程,只有一个 Source(发布者)和一个 Sink(订阅者)。您将这两者连接起来,接收器订阅发布者并开始请求数据,数据流向接收器。在此示例中,您真正需要的只是发布者和订阅者。但是在这个例子中,数据从源到接收器的过程中没有发生任何变化。它没有以任何方式转换,因此不是很有趣也不是很有用。

    处理器结合了发布者和订阅者接口,因此可以同时充当这两个角色。处理器旨在嵌入源和接收器之间的处理流并转换数据。如果我将一个放入之前的源/接收器示例中,数据流以及谁订阅了哪些变化。现在接收器订阅该处理器,而处理器又订阅源。接收器向处理器请求元素,处理器将请求向上传播到源。当有元素满足需求时,它还负责将元素向下游推送到水槽。这就是为什么它必须实现这两个接口,因为它必须同时扮演这两个角色。

    随着您添加的每个处理步骤,例如mapfilter,您正在添加另一个可以处理背压的位置。这些步骤不是数据的起始点(源)或目的地(汇)。他们要做的就是接收数据并对它做一些事情或改变它的流动并将元素发送到下游以满足需求。因为他们需要能够链接到他们需要发布和订阅功能的任何链,这就是处理器存在的原因。

    【讨论】:

    • 所以基本上它所做的只是介于两者之间,在必要时更改数据并增加一些数据?
    • @Rise,处理器真正实现了关于反应流的一切。 Streams 是关于操作、数据和修改流的全部内容,如果没有处理器站在这些阶段,您将拥有的只是一个源 -> 汇流,中间没有任何东西。通过这种小型的自包含 Pub/Sub 配对,它允许您绑定到现有流程并进行修改,并且仍然可以处理背压,甚至可能改变背压的影响(通过类似 buffer 的方式)
    • 但是没有处理器的同样的事情仍然是反应性的?接收器仍然会询问它可以处理的数据量,不是吗?
    • @Rise,是的,它仍然是反应式的(虽然不是很有趣的流程)
    猜你喜欢
    • 1970-01-01
    • 2018-03-08
    • 1970-01-01
    • 1970-01-01
    • 2018-01-14
    • 1970-01-01
    • 2023-03-25
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多