【问题标题】:Why does using share block an observable using withLatestFrom?为什么使用 share 会阻止使用 withLatestFrom 的 observable?
【发布时间】:2021-05-19 23:08:05
【问题描述】:

问题

我有以下运算符:

  const prepare = (value$: Observable<string>) =>
    value$.pipe(
      tap((x) => console.log("prepare: ", x)),
      share()
    );

  const performTaskA = (removed$: Observable<string>) =>
    removed$.pipe(tap((x) => console.log("taskA: ", x)));

  const performTaskB = (removed$: Observable<string>) =>
    removed$.pipe(
      tap((x) => console.log("taskB 1: ", x)),
      withLatestFrom(otherValue$),
      tap((x) => console.log("taskB 2: ", x))
    );

我这样称呼他们:

  const prepared$ = value$.pipe(prepare);
  const taskADone$ = prepared$.pipe(performTaskA);
  const taskBDone$ = prepared$.pipe(performTaskB);

  merge(taskADone$, taskBDone$).subscribe();

产生以下输出:

prepare:  TEST 
taskA:  TEST 
taskB 1:  TEST

请注意,taskB 2 尚未被记录 - 似乎 taskBDone 可观察对象已停止在 performTaskB 中的 withLatestFrom(otherValue$)

如果prepare 中的share 被删除,observable 不会停止,但它(不出所料)会导致prepare 执行两次,这是我不希望的。

问题

  1. 我怎样才能同时执行performTaskAperformTaskBprepare 只执行一次?
  2. 鉴于下面的调试说明,为什么share 会导致发射序列发生变化?

演示
分享(如上):https://codesandbox.io/s/so-share-with-latest-from-with-share-rtyex?file=/src/index.ts:663-853
无分享:https://codesandbox.io/s/so-share-with-latest-from-no-share-p702e

转到右侧的“测试”选项卡,确保控制台可见,然后单击“播放”按钮。

部分解释
调试withLatestFrom 很明显,当源(removed$) 发射时,readyfalse here,这会阻止发射。

这是因为当 share 存在时,输入 (otherValue$) 订阅在源之后发出,因此尚未设置 ready。 (或者是share导致源提前发射?)

但是当share 被删除时,输入订阅发出源之前发出,这意味着ready 通过herehere 设置为真,因此withLatestFrom 发出为预计。

【问题讨论】:

  • performTaskB 中,您直接在withLatestFrom 中使用value$ - 而不是prepare 返回的共享版本。但是removed$ 参数实际上是共享的value$,所以我们在这个函数中使用了两个不同版本的value$。这是故意的吗?
  • 这是故意的,因为它显示了我遇到的问题。但是我已经编辑了问题,现在withLatestFrom 引用了一个单独的变量,该变量更接近我的真实示例并且仍然显示问题。
  • 谢谢-我认为解释可能与您今天的其他问题相似但不相同。在这种情况下,您传递的“TEST”可观察对象(以及 otherValue$)是什么?
  • 我通过TestScheduler/marbles 将它们作为冷观测值传递 - 请参阅codesandbox.io/s/… - 这在某种程度上模仿了我的真实世界应用程序,其中value$Subject$otherValue 已连接到 ngrx 选择器。无论哪种方式,我在这里和我的应用程序中都有同样的问题。谢谢。
  • @backtick 我已经深入研究了这个问题并相应地更新了问题。

标签: rxjs


【解决方案1】:

我尝试运行你的代码,但我无法让它像你的那样挂起。我怀疑这与您的测试框架如何管理可观察对象有关。我无法复制它。

我确实注意到您所写的内容存在一些内在的交错/排序问题。共享同步 observables 会带来一些问题,例如第一个观察者同步观察所有源值以及它的完成第二个观察者可以订阅之前。即使它们应该“同时”订阅,也会发生这种情况。

使用事件循环排序

我在这里写的对我有用:

function init(
  value$: Observable<string>,
  otherValue$: Observable<string>
) {
  const prepare = pipe(
    tap(x => console.log("prepare: ", x)),
    delay(0),
    share()
  );

  const performTaskA = pipe(
    tap(x => console.log("taskA: ", x))
  );

  const performTaskB = pipe(
    tap(x => console.log("taskB 1: ", x)),
    withLatestFrom(otherValue$),
    tap(x => console.log("taskB 2: ", x))
  );

  const prepared$ = value$.pipe(prepare);

  merge(
    prepared$.pipe(performTaskA), 
    prepared$.pipe(performTaskB)
  ).subscribe();
}

init(of("a"), of("b"));

对我来说,这会将其打印到控制台:

prepare: a
taskA: a
taskB 1: a
taskB 2: ["a","b"]

您会注意到在prepare 中调用了 delay(0)。否则,由于共享的 observable 已同步完成,因此会调用两次 prepare。 Delay(0) 只是尽快将下一个调用放入事件队列中。

这不是最好的解决方案。这是一个黑客。最佳解决方案取决于如何使用它。大多数时候,shareReplay(1) 完成了这项工作。如果你在这里使用它,那就行了。

使用发布/连接进行排序

否则,您可以发布和连接以确保订单符合您的预期。

发布/连接可让您在源正式启动/连接/订阅之前设置对源的所有订阅。这可以确保在幕后,所有回调都在任何事情发生之前就位。这是确保完全同步的 observable 可以共享其值的唯一方法。

function init(
  value$: Observable<string>,
  otherValue$: Observable<string>
) {
  const prepare = pipe(
    tap(x => console.log("prepare: ", x)),
    share()
  );

  const performTaskA = pipe(
    tap(x => console.log("taskA: ", x))
  );

  const performTaskB = pipe(
    tap(x => console.log("taskB 1: ", x)),
    withLatestFrom(otherValue$),
    tap(x => console.log("taskB 2: ", x))
  );

  const prepared$ = publish()(value$.pipe(prepare));

  merge(
    prepared$.pipe(performTaskA), 
    prepared$.pipe(performTaskB)
  ).subscribe();

  prepared$.connect();
}

【讨论】:

  • 非常感谢您的回答,并对延迟回复表示歉意。我很欣赏这个解释,但是第一个解决方案并不代表我的场景(输入的 observables 没有完成),第二个解决方案仍然表现出与我最初遇到的相同的问题 - 请参阅 codesandbox.io/s/… - 从测试选项卡运行测试在右侧并观察右下角的控制台输出。我说我已经找到了一种不同的方法,所以没有义务进一步研究这个!
  • 我怀疑这更接近于您的测试框架如何管理 Observables,而不是您对 RxJS 的理解存在问题。不管怎样,我很高兴你已经想出办法了:)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-10-21
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多