【问题标题】:Subjects and resilience logic workflow主题和弹性逻辑工作流
【发布时间】:2016-07-27 21:50:01
【问题描述】:

我希望更好地了解在与弹性运算符(即retryretryWhen)一起使用时 Subjects 的预期行为。

以下代码示例与它们的 JSBin 对应部分(在示例链接中找到)略有不同,因为我使用箭头函数和类型以便于使用,这是使用版本 4.0.0 - 4.0.7

我的预期弹性行为可以用以下example表示:

Rx.Observable
  .interval(1000)
  .flatMap( (count:number) => { 
    return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count);
  })
  .retry()
  .take(5);

 Output 
 // 0
 // 1
 // 2
 // 3 
 // 0 <-- Retry means we start again from scratch (expected)

到目前为止,一切都是一致的,即在第四次通知发生错误之后,整个流从头开始重新启动(无状态架构获胜)。

现在,如果我们添加一个多播操作符并添加一个底层主题(在我的例子中是一个缓冲区为 1 的 ReplaySubject),example

const consumer : Rx.Observable<number> = Rx.Observable
  .interval(1000)
  .flatMap( (count:number) => { 
    return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count);
  })
  .shareReplay(1) /* multicast(new Rx.ReplaySubject(1)).refCount() */
  .retry()
  .take(5);

const firstSubscriber : Rx.Disposable = consumer.subscribe( (next:number) => {
   console.log('first subscriber: ' + next);
});

setTimeout(() => {
   firstSubscriber.dispose(); /* Lets start fresh in that refCount === 0 */
   const secondSubscriber : Rx.Disposable = consumer.subscribe( (next) => {
      console.log('second subscriber: ' + next);
   });
}, 5000 );

Output (before error is thrown)
// "first subscriber: 0"
// "first subscriber: 1"
// "first subscriber: 2"
// "first subscriber: 3"
Output (after error is thrown)
// "first subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3"
// "second subscriber: 3" 

快速查看Subject 可以确定何时出现错误,主题被标记为 inError,并且每个未来的订阅者都会收到最后的通知(第 46 行),并且会在调用 onError 之后立即(第 50 行)。

那么这会给我们带来什么影响?在我看来,当弹性运算符跟随任何其他包含主题(shareReplay、发布等)的运算符时,我认为您永远无法使用弹性运算符。

在这一点上,我认为这种设计成功的唯一方法是确保发生错误并且节点已被处置,每当使用主题时,都需要创建一个新主题(并且在兔子身上)我们开始去的洞)?

multicast 可以带工厂/主题选择器:

.multicast( () => new Rx.ReplaySubject(1), (source:Rx.ConnectableObservable) => source );

查看source,如果您使用subjectSelector 而不是直接为每个新订阅传递主题,那么将调用subjectSelector 并创建一个新的ConnectableObservable(第11 行)。

此时我不确定是否共享(通过一些缓存)和处置(当检测到错误时)主题是否真的会给订阅者多播?

在达到这一点时,我还编写了一个 RecoverableReplaySubject,我在处理时取出了错误状态,这更多是为了测试,并且希望 RxJS 团队有充分的理由加入这个工作流。

我们将不胜感激有关此主题的任何指导和经验。

谢谢

【问题讨论】:

    标签: javascript typescript reactive-programming rxjs frp


    【解决方案1】:

    shareReplay 主题在错误和完成方面具有不同的语义。例如,即使底层的 observable 已经完成(refCount == 0),shareReplay 也不会完成,因此进一步调用它会产生(重放)过去的值。参照。 jsbin(shareReplay) 与 jsbin(share)。

    var source = Rx.Observable
          .interval(100)
          .take(5)
          .shareReplay()
    
    var first = source.subscribe( function(next) {
      console.log('first subscriber: ' + next);
    });
    
    setTimeout(function() {
    //  first.dispose();
      var second = source.subscribe( function(next) {
      console.log('second subscriber: ' + next);
    });
    
    }, 1000 );
    

    否则,您将找到关于 shareReplay(与您的问题的讨论)与其他运营商的行为的解释:

    提出的解决方案正是为多播操作员使用工厂函数。无论如何,尝试您的新设计并查看它是否有效应该不会太难。

    【讨论】:

      猜你喜欢
      • 2021-07-05
      • 1970-01-01
      • 1970-01-01
      • 2023-03-30
      • 2013-01-10
      • 2017-08-28
      • 1970-01-01
      • 1970-01-01
      • 2020-12-02
      相关资源
      最近更新 更多