【问题标题】:Create a stream of consecutive values that pass a filter创建通过过滤器的连续值流
【发布时间】:2017-11-29 00:30:37
【问题描述】:

假设我有一串数字

---1-----1----1----0----0----1----1----0--->

我想得到一个新的数组流,其中包含像这样的连续 1

---[1,1,1]---[1,1]--->

我虽然使用了 scan 函数,但它只发出一个值,我阅读了有关 bufferToggle 的信息,但文档仅将它与定时的 observables 一起使用。有没有这样的功能?

【问题讨论】:

    标签: functional-programming rxjs reactive-programming reactivex


    【解决方案1】:

    一种可能的方法是将scanpairwise 运算符一起使用。

    通过成对使用,您可以将N-1th 发射与Nth 进行比较。

    console.clear();
    var source = Rx.Observable.of(1, 1, 1, 0, 0, 1, 1, 0);
    
    source
      .concat(Rx.Observable.of(0)) // for the case when source completes with 1
      .scan(function(acc, x) {
        // reset accumulator to an empty array when a 0 is encountered
        if (x === 0) {
          return [];
        } else {
          return acc.concat([x]);
        }
      }, [])
      .pairwise()
      // filter when the accumulated value goes from a length greater than 0
      // to 0 then you know you've hit 0
      .filter((pair) => pair[0].length > 0 && pair[1].length === 0)
      // take the first element of the pair since it has the result you're interested in
      .map((pair) => pair[0])
      .subscribe(console.log)
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.min.js"></script>

    【讨论】:

    • 不错的解决方案。不过,我会在.scan 之前添加一个.concat(Observable.of(0)),否则如果源流以 1 结尾,则不会发出结果,例如源Observable.of(1, 1, 1) 根本不会发出任何东西。
    • 好消息@ehrencrona。已更新。
    【解决方案2】:

    可以将源本身用作bufferToggle 的信号 - 它使用可观察对象来控制缓冲区的打开和关闭。

    但是,您必须注意订阅源的顺序。特别是,用于指示缓冲区打开和关闭的 observable 必须在bufferToggle observable 订阅之前订阅。

    这可以使用publishSubject 来完成,如下所示:

    const source = Rx.Observable.of(1, 1, 1, 0, 0, 1, 1, 0);
    const published = source.publish();
    
    const signal = new Rx.Subject();
    published.subscribe(signal);
    
    const buffered = published.bufferToggle(
      signal
        .startWith(0)
        .pairwise()
        .filter(([prev, last]) => (prev === 0) && (last === 1)),
      () => signal.filter(value => value === 0)
    );
    
    buffered.subscribe(value => console.log(value));
    published.connect();
    .as-console-wrapper { max-height: 100% !important; top: 0; }
    <script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

    Subject 是必需的,因为每次发出一个值时,都会订阅用于表示缓冲区关闭的 observable。

    我应该补充一点,我提交这个答案不是因为我认为它比其他答案更好,而是为了表明可以使用 buffer (和 window ) 具有从源派生的信号的运算符。必须注意一些,仅此而已。使用 RxJS,通常有不止一种方法可以做某事。

    此外,创建re-usable, lettable/pipeable operator 来帮助解决这些情况也很容易。

    【讨论】:

      猜你喜欢
      • 2013-11-10
      • 2017-02-11
      • 2016-02-20
      • 1970-01-01
      • 2021-12-29
      • 2017-07-29
      • 2021-01-04
      • 1970-01-01
      • 2019-12-27
      相关资源
      最近更新 更多