【问题标题】:rx: unfold array to multiple streamsrx:将数组展开到多个流
【发布时间】:2014-02-19 00:01:31
【问题描述】:

我有一个包含一个数组的流,其中每个元素都有一个 id。我需要将其拆分为每个 id 的流,当源流不再携带 id 时,该流将完成。

例如这三个值的输入流序列

[{a:1}, {b:1}]    [{a:2}, {b:2}, {c:1}]     [{b:3}, {c:2}]

应该返回三个流

a -> 1 2 |
b -> 1 2 3
c ->   1 2

其中 a 在第 3 个值完成,因为它的 id 已经消失,而 c 在第 2 个值被创建,因为它的 id 已经出现。

我在尝试groupByUntil,有点像

 var input = foo.share();              
 var output = input.selectMany(function (s) {
                        return rx.Observable.fromArray(s);
                }).groupByUntil(
                        function (s) { return s.keys()[0]; },
                        null,
                        function (g) { return input.filter(
                                function (s) { return !findkey(s, g.key); }
                        ); }
                )

因此,按 id 分组,并在输入流不再具有 id 时处理该组。这似乎可行,但输入的两种用法对我来说看起来很奇怪,就像使用单个流来控制 groupByUntil 的输入和组的处置时可能存在奇怪的顺序依赖性。

有没有更好的办法?

更新

这里确实存在一个奇怪的时间问题。 fromArray 默认使用 currentThread 调度程序,这将导致来自该数组的事件与来自输入的事件交错。然后在错误的时间(在处理来自先前输入的组之前)评估组的处置条件。

一种可能的解决方法是使用 fromArray(.., rx.Scheduler.immediate),这将使分组事件与输入保持同步。

【问题讨论】:

    标签: javascript reactive-extensions-js rxjs


    【解决方案1】:

    是的,我能想到的唯一选择是自己管理状态。我不知道它会更好。

    var d = Object.create(null);
    var output = input
        .flatMap(function (s) {
            // end completed groups
            Object
                .keys(d)
                .filter(function (k) { return !findKey(s, k); })
                .forEach(function (k) {
                    d[k].onNext(1);
                    d[k].onCompleted();
                    delete d[k];
                });
            return Rx.Observable.fromArray(s);
        })
        .groupByUntil(
            function (s) { return s.keys()[0]; },
            null,
            function (g) { return d[g.key] = new Rx.AsyncSubject(); });
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-01-15
      • 2015-10-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多