【问题标题】:RxJS Observable: repeat using count and then using notifierRxJS Observable:重复使用计数,然后使用通知器
【发布时间】:2018-08-28 09:11:19
【问题描述】:

我有一个发出 Either = Success | Failure 的 Observable:

import { Observable } from 'rxjs';

type Success = { type: 'success' };
type Failure = { type: 'failure' };

type Either = Success | Failure;

const either$ = new Observable<Either>(observer => {
    console.log('subscribe');
    observer.next({ type: 'failure' });
    observer.complete();
    return () => {
        console.log('unsubscribe');
    };
});

我想让用户在 Observable 完成并且最后一个值为 Failure 时“重试”observable。

retry{,When} 运算符在这里没有帮助,因为它们在 error 频道上处理错误。因此,我认为我们应该改为考虑 repeat。)

我想:

  • 重复 Observable n 次,直到最后一个值不是 Failure
  • 然后,允许用户手动重复。当重复通知 observable (repeat$) 发出时,再次重复 observable。

例如:

// subscribe
// next { type: 'failure' }
// unsubscribe

// retry 2 times:

// subscribe
// next { type: 'failure' }
// unsubscribe

// subscribe
// next { type: 'failure' }
// unsubscribe

// now, wait for repeat notifications…
// on retry notification:

// subscribe
// next { type: 'failure' }
// unsubscribe

【问题讨论】:

  • 我认为其中的主要缺陷是您想手动重复该链,因为一个subscribe() 基本上意味着您将拥有一个订阅。如果您允许用户手动重试,那么您将拥有一个包含多个订阅的链,这不是您通常想要实现的。我认为使用 window 运算符可以工作,但我建议在每个 repeat$ 上调用新的 subscribe
  • 我的函数需要返回一个Observable,并且订阅是在我的控制之外创建的。

标签: rxjs observable repeat


【解决方案1】:

我想不出更简单的东西,但代码可以满足你的要求。

https://stackblitz.com/edit/typescript-yqcejk

defer(() => {
   let retries = 0;

   const source = new BehaviorSubject(null);

   return merge(source, repeat$.pipe(filter(() => retries <= MAX_RETRIES)))
       .pipe(
           concatMapTo(either$),
           tap(value => {
               const action = value as Either;
               if (action.type === 'failure') {
                   if (retries < MAX_RETRIES) {
                       retries += 1;
                       source.next(null);
                   }
               } else {
                   retries = 0;
               }
           })
       )
}).subscribe(console.log);

我不得不手动计算重试次数。

代码有两个事件源source 用于自动重试和repeat$ 用于用户重试。所有事件都使用concatMapTo 映射到either$。作为副作用,我们要么next() 重试,要么什么都不做,等待用户重试。

使用filter(() =&gt; retries &gt;= MAX_RETRIES) 禁止用户重试,直到达到MAX_RETRIES 计数。

【讨论】:

  • 太棒了,谢谢。我注意到 unsubscribes 在所有重试完成之前不会发生。你知道这是为什么吗?另外,有什么理由使用concatMapTo 而不是mergeMapTo
  • unsubscribes 最后发生,因为我认为运行是同步的,您可以使用 observeOn(asyncScheduler) 使其异步运行,然后它完全按照您的示例取消订阅。至于concatMapTomergeMapTo - 我选择concatMapTo 是因为它更安全,可以防止多个either$ Observables 同时运行...
猜你喜欢
  • 1970-01-01
  • 2020-05-07
  • 1970-01-01
  • 2020-04-15
  • 1970-01-01
  • 1970-01-01
  • 2021-08-31
  • 2017-12-13
  • 1970-01-01
相关资源
最近更新 更多