【问题标题】:Reactive streams with reactive side-effects具有反应性副作用的反应性流
【发布时间】:2019-03-22 00:47:47
【问题描述】:

我认为这将与大多数反应式实现类似地完成,所以我没有在这里指定任何特定的库和语言。

对流进行反应性副作用的正确方法是什么(不确定这里的正确术语是什么)?

假设我们有一个Players 流。我们希望将其转换为只有ids 的流。但除此之外,我们还想根据 id 做一些额外的反应式处理。

这是一个不太优雅的方式,但实现这一点的惯用方式是什么:

Observable<Player> playerStream = getPlayerStream();
return playerStream
      .map(p -> p.id)
      .flatmap(id -> {
           Observable<Result> weDontCare = process(id);
           // Now the hacky part
           return weDontCare.map(__ -> id);
      })

这样好吗?似乎没有那么优雅。

【问题讨论】:

标签: reactive-programming rx-java2 project-reactor


【解决方案1】:

我不知道 RxJava,但是您也标记了项目反应器,并且有两种方法可以使用反应器来执行此操作,具体取决于您希望副作用如何影响您的流。如果您想等待副作用发生以便处理错误等(我的首选方式),请使用 delayUntil:

return getPlayerStream()
  .map(p -> p.id)
  .delayUntil(id -> process(id));

如果您希望 id 直接通过而无需等待,而是执行“即发即弃”风格的副作用,您可以使用 doOnNext:

return getPlayerStream()
  .map(p -> p.id)
  .doOnNext(id -> process(id).subscribe());

【讨论】:

    【解决方案2】:
    Observable<Player> playerStream = getPlayerStream();
        return playerStream
              .map(p -> p.id)
              .doOnEach(id -> { //side effect with id
                Observable<Result> weDontCare = process(id);
                weDontCare.map(__ -> id);
              })
    

    【讨论】:

    • 首先,RxJava 中没有tap 运算符。其次,switchMap 可能会为您丢失项目,因为它会不断替换来自 playerStream 的新项目的内部 observables,永远不会让之前的地图将 id 重新放入主流程。
    • 但是在例子中他不需要那个
    • 再次阅读帖子。 OP 希望对每个项目进行异步调用,但仍使用原始 playerStream 中的 id 继续主流程。它没有说 OP 在最后没有得到大多数 id 很好,switchMap 会很高兴地松动。
    • 就这样
    • doOnEach 无法返回任何内容,也不会执行 weDontCare
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2021-02-21
    • 2015-09-13
    • 2018-07-20
    • 2019-04-10
    • 2019-06-22
    • 2016-11-12
    • 2016-03-24
    相关资源
    最近更新 更多