【问题标题】:RxJS interleaving merged observables (priority queue?)RxJS 交错合并的 observables(优先队列?)
【发布时间】:2018-02-10 17:05:08
【问题描述】:

更新

我想我已经找到了解决方案。我在这个视频中解释它。基本上,使用 timeoutWith 和一些 zip 技巧(在 zip 内)。

https://youtu.be/0A7C1oJSJDk


如果我有一个像这样的 observable:

A-1-2--B-3-4-5-C--D--6-7-E

我想把“数字”放在较低的优先级;它应该等到“字母”被填满(例如一组 2 个)或达到超时,然后它才能发出。也许下面的说明(期望的结果)可以提供帮助:

A------B-1-----C--D-2----E-3-4-5-6-7

我一直在尝试一些想法...其中一个:第一步是拆分该流(groupBy),一个包含字母,另一个包含数字...,然后“中间发生了一些事情” ...,最后这两个(子)流合并了。

这就是我想要弄清楚的“中间的东西”。

如何实现呢? RxJS(版本 5.5.6)甚至可以实现吗?如果不是,最近的那个是什么?我的意思是,我要避免的是让“数字”泛滥成灾,而没有足够的机会让“字母”得到及时处理。

也许我到目前为止所做的这段视频也可以澄清:

到目前为止,我的解决方案的问题(使用 .delay 延迟“数字”子流中的每个发射)不是最理想的,因为即使在“字符”(子)流结束后,它仍以缓慢的速度(10 秒)计时(没有完成——这里没有明确的界限——只是在不确定的时间内没有获得更多的价值)。我真正需要的是,一旦发生这种情况,让“数字”子流加快速度(到 2 秒)。

【问题讨论】:

    标签: merge observable priority-queue rxjs5 reactive


    【解决方案1】:

    不幸的是,我不太了解RxJs5,我自己使用xstream(由RxJS5 的一位贡献者编写),这在运算符数量方面要简单一些。

    我制作了以下示例: (注意:运算符与 Rx5 中的几乎相同,主要区别在于 flatten 或多或少类似于 switch,但似乎处理同步流的方式不同。

    const xs = require("xstream").default;
    
    const input$ = xs.of("A",1,2,"B",3,4,5,"C","D",6,7,"E");
    
    const initialState = { $: xs.never(), count: 0, buffer: [] };
    const state$ = input$
        .fold((state, value) => {
            const t = typeof value;
            if (t === "string") {
                return {
                    ...state,
                    $: xs.of(value),
                    count: state.count + 1
                };
            }
            if (state.count >= 2) {
                const l = state.buffer.length;
                return {
                    ...state,
                    $: l > 0 ? xs.of(state.buffer[0]) : xs.of(value) ,
                    count: 0,
                    buffer: state.buffer.slice(1).concat(value)
                };
            }
            return {
                ...state,
                $: xs.never(),
                buffer: state.buffer.concat(value),
            };
        }, initialState);
    
    
    xs
        .merge(
            state$
            .map(s => s.$),
            state$
            .last()
            .map(s => xs.of.apply(xs, s.buffer))
        )
        .flatten()
        .subscribe({
            next: console.log
        });
    

    这给了我你正在寻找的结果。

    它的工作原理是将流折叠起来,查看值的类型并根据它发出一个新流。当您因为没有发送足够的信件而需要等待时,我会发出一个 emptystream(不发出任何值、没有错误、没有完成)作为“占位符”。

    您可以发出类似

    的东西,而不是发出这个空流
    xs.empty().endsWith(xs.periodic(timeout)).last().mapTo(value):
    // stream that will emit a value only after a specified timeout.
    // Because the streams are **not** flattened concurrently you can
    // use this as a "pending" stream that may or may not be eventually
    // consumed
    

    其中 value 是最后收到的数字,以便实现与超时相关的条件,但是您需要在 Rx 中使用 Subject 或在 xstream 中使用 xs.imitate 引入某种自反性,因为您需要通知您的状态您的“待处理”流已被消耗,这使得通信是双向的,而流/observables 是单向的。

    【讨论】:

      【解决方案2】:

      这里的关键是使用 timeoutWith,当“事件”开始时切换到更积极的“起搏器”。在这种情况下,“事件”是“在更高优先级的流中检测到空闲”。

      视频:https://youtu.be/0A7C1oJSJDk

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-10-28
        • 2022-11-15
        • 1970-01-01
        • 2011-12-20
        • 1970-01-01
        • 2017-04-11
        相关资源
        最近更新 更多