【问题标题】:rxjs: what is the best way to dynamically add values to an observable stream?rxjs:向可观察流动态添加值的最佳方法是什么?
【发布时间】:2020-07-29 10:19:27
【问题描述】:

作为学习 rxjs 的一部分,我一直在使用 create 方法 of、from、interval 等来测试节流阀和 deboucne 等,我一直在使用 fromevent 创建流。

现在我有了一个真实的用例,我需要动态地将值添加到一个空的可观察流中。我找不到任何关于如何最好地做到这一点的例子,而不是使用上面的创建方法。目前,我使用 BehaviourSubject 使用 next() 将项目动态添加到流中。这是向流中动态添加新项目的最佳/首选方式吗?

例如

import { BehaviorSubject, timer } from 'rxjs';
import { tap, mapTo, concatMap, } from 'rxjs/operators';

const subject = new BehaviorSubject(1);
const example = subject.pipe(
  concatMap(ev => timer(200).pipe(mapTo(ev))),
  tap((ev) => console.log(ev))
)
example.subscribe();

// add a flurry of values dynamically
subject.next(2);
subject.next(3);
subject.next(4);

// some time later add some more
setTimeout(function(){ 
  subject.next(5);
  subject.next(6);
  subject.next(7);
}, 5000);

https://stackblitz.com/edit/rxjs-behaviorsubject-simpleexample-gyrtw8?file=index.ts

谢谢

【问题讨论】:

  • 这绝对是一种实现您想要的方式,但如果知道您的function 实际上是什么会很有趣,我的意思是嵌入在setTimeout 中的function。如果setTimeout 模拟某种基于callback 的机制,则可能有其他方法。

标签: rxjs stream


【解决方案1】:

如果您有一个奇怪的自定义逻辑来添加应该在 Observable 中发出的值,您可以创建自己的(而不是使用 fromEvent, of, from, ...):


const myObservable = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);

  setTimeout(() => {
    subscriber.next(4);
    subscriber.next(5);


    setTimeout(() => {
       subscriber.next(6);
    }, 2000);
  }, 1000);
});

但是,rxjs 的创建功能应该可以满足您 99% 的需求。 上面的代码也可以这样写:

concat(
  of(1,2,3),
  of(4,5).pipe(
    delay(1000)
  ),
  of(6).pipe(
    delay(2000)
  )
)

UPD:关于主题

Subject 也是 Observable,因此在您的情况下使用 Subject 是适用的,但可能不是最佳选择。主题的想法是可以有多个订阅者(使用主题值的订阅者),但我不确定这是您的情况(顺便说一下 - 您可以提供您的真实示例来帮助我们了解你想要达到的目标)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-12-02
    • 2012-01-07
    • 1970-01-01
    • 2015-03-12
    • 2014-12-17
    • 1970-01-01
    • 2011-01-25
    相关资源
    最近更新 更多