【问题标题】:Is it generally OK to use a Reactive Streams Processor as an event bus?将反应式流处理器用作事件总线通常可以吗?
【发布时间】:2017-05-03 21:20:55
【问题描述】:

我开始学习反应式流是因为我对使用 RxJava 替代更传统的事件总线的新趋势感到好奇。 This blog post 是对如何完成此操作的典型描述。如果我理解正确的话,RxJava 1.x 并不是严格意义上的 Reactive Streams 的实现,但它非常相似。 2.0 版包含一些兼容的类,或者至少通过了 TCK,因此此代码的更新版本可能看起来有些不同。

public class UserLocationModel {

  private PublishSubject<LatLng> subject = PublishSubject.create();

  public void setLocation(LatLng latLng) {
    subject.onNext(latLng);
  }

  public Observable<LatLng> getUserLocation() {
    return subject;
  }
}

在 Reactive Streams 术语中,我认为 subjectProcessor,它既是 Publisher 又是 Subscriber

问题在于,在未订阅任何内容的 Subscriber 上调用 onNext 似乎违反了 Reactive Streams 规范,尤其是 rule 1.9

这仅仅是一个实现细节吗?我认为您通常不能依赖此与兼容的 Reactive Streams 实现一起工作,这是否正确?

【问题讨论】:

    标签: rx-java event-bus reactive-streams


    【解决方案1】:

    标准 RxJava 2 的Subjects 和Processors 是宽松的,因此您不必在调用其他方法之前对它们调用onSubscribe。这部分是由于传统性,因为 1.x 主题没有onSubscribe,部分是由于 RxJava 2 处理器不会通过选择协调 Subscriber 端和 Publisher 端之间的请求,因此Subscription 没用。

    如果您订阅了 RxJava Processor 任何符合 RS 标准的 Publisher,它们似乎会请求 Long.MAX_VALUE 并尽可能多地中继信号。如果您将符合 RS 的 Subscriber 订阅到 RxJava Processors,它们将尊重那些 Subscribers 的背压并且永远不会溢出它们,但是,缺少请求可能会导致单个 MissingBackpressureException 被发出并且 @ 987654336@“扔”了。在 extensions library 中有一个自定义的 Publisher 来执行坐标请求。

    我认为您通常不能依赖此与兼容的 Reactive Streams 实现一起工作,这是否正确。

    规范中没有任何内容,因此没有在 TCK 中测试如果 Processor 没有收到 onSubscribe 调用但它需要它会发生什么,因此,我认为这已成为实现细节。

    这里有两个更大的问题:

    1. 发明了主题以将命令式世界与反应式世界连接起来,并在 GUI 案例和非背压案例中作为事件的多播器很好地工作。在响应式-响应式多播中,它们是更好、更直接的替代方案,例如 publish(Function)
    2. 在事件总线中思考是一种倒退,因为您通过在单个“轨道”上铲入和排出事件来创建单个阻塞点。相比之下,响应式设计更倾向于单独且通常独立的流,其中每个流都可以根据需要在线程之间跳转,并且可能直到最后一刻才避开主线程。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-01-25
      • 2013-02-07
      • 2015-07-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多